2 Replies Latest reply on Nov 8, 2004 7:20 PM by gmand

    JMS subscriber reconnect

    gmand

      I've looked through the forum and have yet to find a great example of how to implement a subscriber reconnect after a ping timeout has occurred. I've implemented the ExceptionListener to know when the timeout has occurred, but now I want to know what the correct way is to reconnect to the providor and my topic. From what I've read I know that I need to close the connection (connection.close()) but what is the best way after that? Thanks for any suggestions.

        • 1. Re: JMS subscriber reconnect
          genman


          Take a look at

          server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java

          in the JBoss sources.

          • 2. Re: JMS subscriber reconnect
            gmand

            Thanks -

            Here is what I've implemented, however I am still having issues with regards to closing the connection. When there is a network failure, I get a ping timeout exception, which is what I expect, but when I go to close the connection it just hangs there. I never reach my notify() statement. What am I missing? This works on SilverStreams app server, is there something different that I need to do on JBoss?


            import javax.jms.*;
            import javax.naming.InitialContext;
            import java.io.PrintWriter;
            import java.io.FileWriter;
            import java.io.IOException;
            import java.util.Date;
            import java.text.DateFormat;

            public class SmartListener implements MessageListener, ExceptionListener, Runnable{
            String topicName;
            TopicSubscriber topicSubscriber;
            TopicConnection connection;

            public void run(){
            startConnection();
            }

            public SmartListener(String topicName){
            this.topicName = topicName;
            }

            public void startConnection(){

            /**
            * Thread which performs reconnections, usually waiting for to be instructed to start
            * the re-connection process
            */
            Thread t = new Thread(){
            public void run(){
            while(true){
            reconnect();
            }
            }
            };
            t.start();

            //Create TopicSubscriber, if it fails immediatley notify re-connect thread
            try{
            topicSubscriber = createSubscriber();
            }
            catch(JMSException jmse){
            System.out.println("JMSException: " + jmse);
            synchronized(this){
            notify();
            }
            }
            }

            public String buildFilter(){
            String filter = "alias in (";
            for(int i=0; i<modelRef.ports.length; i++){
            filter += "'" + modelRef.ports.getAlias() + "', ";
            }
            filter += ")";
            return filter;
            }

            public void onMessage(Message message){
            //do onMessage
            System.out.println("GOT MESSAGE");
            }


            public synchronized boolean reconnect(){
            doWait(); //Wait to be notified
            System.out.print("Trying to reconnect ...");
            for(int i=0; i < 10; i++){
            try{
            topicSubscriber = createSubscriber();
            System.out.print("ok");
            return true;
            }
            catch(Exception e){
            System.out.print(".");
            try{
            Thread.sleep(1000);
            }
            catch(InterruptedException exc){}
            }
            }
            System.err.println("failed after 10 attempts");
            return false;
            }

            public TopicSubscriber createSubscriber() throws JMSException{
            TopicSubscriber subscriber = null;

            try{
            InitialContext ctx = new InitialContext();
            TopicConnectionFactory factory = (TopicConnectionFactory)ctx.lookup("ConnectionFactory");
            Topic topic = (Topic)ctx.lookup("topic/powerControlTopic");
            connection = factory.createTopicConnection();
            connection.start();

            TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            connection.setExceptionListener(this);

            String filter = buildFilter();
            subscriber = session.createSubscriber(topic, filter, false);
            }
            catch(Exception e){
            throw new JMSException("Can't initialize: " + e);
            }
            subscriber.setMessageListener(this);
            return subscriber;
            }

            public synchronized void onException(JMSException jmse){
            System.out.println("onException(): " + jmse);
            try{
            connection.setExceptionListener(null);
            connection.stop();
            connection.close();
            connection = null;
            //topicSubscriber.close();
            }
            catch(JMSException je){

            je.printStackTrace();
            }
            System.out.println("Notifying ...");
            notify(); //trigger reconnect thread
            }

            public synchronized void doWait(){
            try{
            wait();
            }
            catch(InterruptedException e){e.printStackTrace();}
            }
            }