6 Replies Latest reply on Jun 4, 2006 7:05 AM by Tim Fox

    Get timeout with a consumer of topic

    Xinyu Zhang Newbie

      Hello there:

      I installed JBoss 4.0.3 SP1 and JBoss Messaging 1.0.0.GA. I ran the examples for queue and topic and they work fine.

      Then for the topic example I separate the message producer and consumer into two applications and they work fine. Then I change the consumer to receive messages continuously in an infinite loop:

      connection = cf.createConnection();
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      MessageConsumer subscriber = session.createConsumer(topic);

      subscriber.setMessageListener(this);
      connection.start();

      while (true) {
      waitForMessage();
      TextMessage the_message = (TextMessage)getMessage();
      System.out.println("For topic " + topic + ": received message: " + the_message.getText());
      } //END while (true)

      If the producer sends messages continuously, the consumer works fine. However, if the producer stops, the consumer gets a timeout exception:

      10:03:03,082 ERROR @SocketServerInvokerThread-10.67.21.88-0 [ServerThread] socket timed out
      java.net.SocketTimeoutException: Read timed out
      at java.net.SocketInputStream.socketRead0(Native Method)
      at java.net.SocketInputStream.read(SocketInputStream.java:129)
      at java.io.BufferedInputStream.fill(BufferedInputStream.java:183)
      at java.io.BufferedInputStream.read(BufferedInputStream.java:201)
      at java.io.FilterInputStream.read(FilterInputStream.java:66)
      at org.jboss.serial.io.JBossObjectInputStream.read(JBossObjectInputStream.java:126)
      at org.jboss.remoting.transport.socket.ServerThread.readVersion(ServerThread.java:464)
      at org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:381)
      at org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:498)
      at org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:240)


      Does anyone have any idea? It doesn't make sense to me that the consumer should keep receiving messages or it will die for timeout exception.

      Thank you in advance.

      Xinyu



        • 1. Re: Get timeout with a consumer of topic
          Ovidiu Feodorov Master

          I haven't seen this behavior yet.
          Could you write a simple test case (for an example, look at org.jboss.test.messaging.jms.JMSTest) that I can use to reproduce it?

          • 2. Re: Get timeout with a consumer of topic
            Xinyu Zhang Newbie

            The following are two classes, SimpleTopicListener, and SimpleTopicSender. Here is what I did:

            (1) Installed JBoss 4.0.3 SP1 and JBoss Messaging 1.0.0.GA with the standard process.
            (2) Start JBoss with "run -c messaging" command.
            (3) Start the SimpleTopicListener class from the same computer.
            (4) Start the SimpleTopicSender class from the same computer.

            Before or after step (4), if the SimpleTopicListener class doesn't receive messages within 1 minute, it throws the timeout exception and won't receive any messages.

            /*
            * SimpleTopicListener.java
            *
            * Created on April 27, 2006, 12:47 PM
            *
            * To change this template, choose Tools | Template Manager
            * and open the template in the editor.
            */

            package test;

            import javax.jms.Connection;
            import javax.jms.ConnectionFactory;
            import javax.jms.JMSException;
            import javax.jms.Message;
            import javax.jms.MessageConsumer;
            import javax.jms.MessageListener;
            import javax.jms.Session;
            import javax.jms.TextMessage;
            import javax.jms.Topic;
            import javax.naming.InitialContext;
            import javax.naming.NamingException;

            /**
            *
            * @author r55151
            */
            public class SimpleTopicListener extends Thread implements MessageListener {

            private String topic;

            private Message message;

            /** Creates a new instance of SimpleTopicListener */
            public SimpleTopicListener(String topic) {
            this.topic = topic;
            }

            public void run() {

            String destinationName = "/topic/" + topic;

            InitialContext ic = null;
            Connection connection = null;

            try {
            ic = new InitialContext();

            ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
            Topic topic = (Topic)ic.lookup(destinationName);

            System.out.println("Client started to listen to Topic " + destinationName);

            connection = cf.createConnection();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer subscriber = session.createConsumer(topic);

            subscriber.setMessageListener(this);
            connection.start();

            while (true) {
            waitForMessage();
            TextMessage the_message = (TextMessage)getMessage();
            System.out.println("For topic " + topic + ": received message: " + the_message.getText());
            } //END while (true)
            } catch (NamingException ne) {
            ne.printStackTrace();
            } catch (JMSException jmse) {
            jmse.printStackTrace();
            } finally {

            if(ic != null) {
            try {
            ic.close();
            }catch(NamingException ne){
            ne.printStackTrace();
            }
            } //END if(ic != null)

            //ALWAYS close your connection in a finally block to avoid leaks
            //Closing connection also takes care of closing its related objects e.g. sessions
            if (connection != null) {
            try {
            connection.close();
            } catch (JMSException jmse) {
            jmse.printStackTrace();
            }
            } //END if (connection != null)

            } //END try, finally
            } //END public void run()

            public synchronized void onMessage(Message message) {
            this.message = message;
            notifyAll();
            }

            /** Remove the message from the cache and return the message.
            * @return Message object.
            */
            public synchronized Message getMessage() {
            Message ret_val = message;
            message = null;
            return ret_val;
            }


            protected synchronized void waitForMessage() {
            while (message == null) {
            try {
            wait(1000);
            } catch(InterruptedException e) {
            //doing nothing
            }
            } //END while (message == null)
            } //END protected synchronized void waitForMessage()

            public static void main(String[] args) {
            String topic = "testTopic";
            if (args.length >= 1) {
            topic = args[0];
            }
            (new SimpleTopicListener(topic)).start();
            }
            } //END public class SimpleTopicListener extends Thread implements MessageListener


            /*
            * SimpleTopicSender.java
            *
            * Created on May 3, 2006, 9:20 AM
            *
            * To change this template, choose Tools | Template Manager
            * and open the template in the editor.
            */

            package test;

            import javax.jms.Connection;
            import javax.jms.ConnectionFactory;
            import javax.jms.JMSException;
            import javax.jms.MessageProducer;
            import javax.jms.Session;
            import javax.jms.TextMessage;
            import javax.jms.Topic;
            import javax.naming.InitialContext;
            import javax.naming.NamingException;

            /**
            *
            * @author r55151
            */
            public class SimpleTopicSender {

            public static void main(String[]args) {

            int msg_number = 10; //number of messages to be sent
            if (args.length >= 1) {
            try {
            msg_number = Integer.valueOf(args[0]).intValue();
            } catch (NumberFormatException nfe) {
            //doing nothing.
            }
            }

            String topic_name = "testTopic";
            String destinationName = "/topic/" + topic_name;

            InitialContext ic = null;
            Connection connection = null;

            try {
            ic = new InitialContext();

            ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
            Topic topic = (Topic)ic.lookup(destinationName);
            System.out.println("Topic " + destinationName + " exists");

            connection = cf.createConnection();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer publisher = session.createProducer(topic);

            connection.start();
            String msg = "Sending the message to topic " + topic_name + ", serial no = ";
            for (int i=0;i<msg_number;i++) {
            String message = msg + i;
            System.out.println("will send message: " + message);
            TextMessage text_message = session.createTextMessage(message);
            publisher.send(text_message);
            System.out.println("==== The message was successfully published on the topic " + topic_name);
            try {
            Thread.sleep(500);
            } catch (InterruptedException iue) {
            //doing nothing
            }
            } //END for (int i=0;i<messages.length;i++)
            } catch (NamingException ne) {
            ne.printStackTrace();
            } catch (JMSException jmse) {
            jmse.printStackTrace();
            }finally{

            if(ic != null) {
            try {
            ic.close();
            }catch(NamingException ne){
            ne.printStackTrace();
            }
            }
            //ALWAYS close your connection in a finally block to avoid leaks
            //Closing connection also takes care of closing its related objects e.g. sessions
            if (connection != null) {
            try {
            connection.close();
            } catch (JMSException jmse) {
            jmse.printStackTrace();
            }
            }
            } //END try, finally
            System.exit(0);
            } //END public static void main(Sring[]args)

            } //END public class SimpleSendMessagesToTopic

            • 3. Re: Get timeout with a consumer of topic
              Ovidiu Feodorov Master

              It's a bug. Thanks for reporting it. Use this to track it: http://jira.jboss.org/jira/browse/JBMESSAGING-371
              Will be fixed in 1.0.1.CR2.

              • 4. Re: Get timeout with a consumer of topic
                Ovidiu Feodorov Master

                It seems that Remoting behavior changed recently.

                I am following up on this on the Remoting forum http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3941996#3941996

                Until this is clarified, I changed Messaging to conform to the new Remoting behavior, and added a test to detect such changes in the future.

                The fix is available in the head and it will be included in 1.0.1.CR2

                • 5. Re: Get timeout with a consumer of topic
                  wxxg Newbie

                  Hi there,

                  We have similar "read timed out" problem with JBoss MQ (4.0.3sp1 and 4.0.4.GA), instead of JBoss Messaging. Could JBoss experts here advice if the fix is applicable to JBoss MQ as well?

                  Regards/Brian

                  • 6. Re: Get timeout with a consumer of topic
                    Tim Fox Master

                    Highly unlikely the fix will apply to JBoss MQ.

                    Please can you post in the JBoss MQ forum.