8 Replies Latest reply on Jul 19, 2011 6:25 AM by ataylor

    No Consumer Buffering caused random Queue behavior

    riteshj2ee

      Hi All,

       

      Earlier when new Consumer starts, it wasn’t able to pick-up the messages from Queue even though messages are there in Queue because

      messages were internally buffered in Consumers which are already running and that’s is the default behavior of HornetQ.

      This problem was resolved by introducing No Consumer Buffering in Queue, But after that Queue behavior became unstable.

      Eg. After picking up the few messages from Queue, sometimes Consumers are not picking-up the next available messages.

       

      Now to resolve this problem, I am just closing the Session on existing Connection after receiving one message from Queue.

      So Connection will be established to Queue only once but every time new Session will be created before receiving a message and

      Session will be closed after receiving a message from Queue.

       

      Can anybody tell me is this approach is fine or not?

       

      Earlier without introducing No Consumer Buffering, single Session was working as per expectation, so why its not working fine with

      No Consumer Buffering?

       

      CODE: I am using JBoss 6AS HornetQ with JMS on Linux OS

       

      // initialization code

      queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup(QUEUE_CONNECTION_FACTORY);

      queue = (Queue) jndiContext.lookup(JNDI_QUEUE_NAME);

      queueConnection = queueConnectionFactory.createQueueConnection();

      queueConnection.start();

       

       

      // code in loop

      queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);

      queueReceiver = queueSession.createReceiver(queue, receiverMessageSelector);

      Message m = queueReceiver.receive();

      queueSession.close();

       

      Steps to reproduce this problem:

       

      1. Connection factory with No Consumer Buffering (hornetq-jms.xml):

         <connection-factory name="NettyConnectionFactory">

            <connectors>

               <connector-ref connector-name="netty"/>

            </connectors>

            <entries>

               <entry name="/ConnectionFactory"/>

               <entry name="/XAConnectionFactory"/>

            </entries>

            <consumer-window-size>0</consumer-window-size>

         </connection-factory>

       

       

       

        2.  code to fetch message from Queue & its initialization

          private Job getJobFromJmsQueue() throws JMSException, SQLException {

              ObjectMessage message = null;

              LOG.info("Waiting For Job.");

              LOG.info("Session Start!!");

              queueSession = queueConnection.createQueueSession(false,

                                              Session.AUTO_ACKNOWLEDGE);

              final String receiverMessageSelector = receiverMessageSelector(ClientUtils.getHostName().toLowerCase());

              LOG.info("Receiver session String:" + receiverMessageSelector);

              queueReceiver = queueSession.createReceiver(queue, receiverMessageSelector);

       

              Message m = queueReceiver.receive();

       

              if (m != null) {           

                  if (m instanceof ObjectMessage) {

                      LOG.info("Job Received.");

                      message = (ObjectMessage) m;

                  } else if (m instanceof TextMessage) {

                      TextMessage tm = (TextMessage)m;

                      LOG.info("Message received:" + tm.getText());

                  } else {

                      throw new JMSException("Invalid Message in the queue.");

                  }

              }

              Job job = (Job)message.getObject();

       

              queueSession.close();

              queueSession=null;

       

              LOG.info("Session Closed!!");

              return job;

          }

       

          private void init() {

              try {

                  queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup(QUEUE_CONNECTION_FACTORY);

                  queue = (Queue) jndiContext.lookup(JNDI_QUEUE_NAME);

                  queueConnection = queueConnectionFactory.createQueueConnection();

                  queueConnection.start();

                  // message selector

                  receiverMessageSelector = receiverMessageSelector(ClientUtils.getHostName().toLowerCase());

                  LOG.info("Receiver session String:" + receiverMessageSelector);

              } catch (JMSException e) {

                     System.exit(1);

              } catch (NamingException e) {

                  System.exit(1);

              }

      }

       

       

       

       

      3. Produce some (10-20) message and send to the Queue.

      4. Start consumer1

      5. Start consumer2

      6. Confirm that both consumer1 & consumer2 are receiving messages

      7. Repeat step 3,4,5 for two-three times it will start behaving randomly, sometimes Consumers will not receive messages even though its there in the Queue. By restarting that consumer again it will receive that message and again stop receiving within few minutes.

       

       

       

      Thanks & Regards,

      Ritesh