1 Reply Latest reply on Dec 6, 2010 9:49 AM by Toby Morris

    Redelivery problem when using MessageSelectors

    Toby Morris Newbie

      We’ve come across a scenario where messages are being set for redelivery when we don’t believe they should be.

       

      Our setup:

      -         Multiple consumers, using message selectors, set to watch the queue in turns. One will wait for the next message. Once that consumer either finds a message or times out waiting, it releases a synchronized method allowing another consumer to get the next message. This continues until the server is shut down.

      -         Multiple producers writing to the queue whenever they need to.

      -         JMS queue with a redelivery time set to 5 minutes.

       

      What we’re seeing is if a consumer connects to the queue while there are messages waiting, some to all of those messages will get set to redeliver. If a message enters the queue while a consumer is waiting, it will process it as normal.

       

      Unit testing has been performed to replicate this condition. If we set multiple consumers (two in this case) to watch the queue even constantly, and release messages in a controlled manner, we can get a redelivered message everytime.

       

      We found three things that seem to be required for this situation to occur:

      1. The queue has to have a redelivery time greater than -1. If the time is -1, the problem doesn’t occur.
      2. The consumer must use a message selector. Without a selector, we don’t see the problem.

       

      I can post our unit test code and more information, if requested.

        • 1. Re: Redelivery problem when using MessageSelectors
          Toby Morris Newbie

          Here's a simple unit test that performs two tests. Both tests send two messages and attempt to receive two messages. The first sends messages with different custom property "SenderID" values and the second sends messages with the same custom property "SenderID" values. The first succeeeds and the second fails. The JMS queue has a redelivery value greater than 0.

          package jmstest;

          import static org.junit.Assert.assertTrue;
          import java.io.Serializable;
          import java.util.Hashtable;
          import java.util.Map;
          import java.util.Properties;
          import javax.jms.DeliveryMode;
          import javax.jms.JMSException;
          import javax.jms.MessageProducer;
          import javax.jms.ObjectMessage;
          import javax.jms.Queue;
          import javax.jms.QueueConnection;
          import javax.jms.QueueConnectionFactory;
          import javax.jms.QueueReceiver;
          import javax.jms.QueueSession;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;
          import org.apache.log4j.Logger;
          import org.junit.After;
          import org.junit.AfterClass;
          import org.junit.Before;
          import org.junit.BeforeClass;
          import org.junit.Test;

          public class JMSQueueTest
          {
              private static Logger log = Logger.getLogger(JMSQueueTest.class);

              private String jmsServer = System.getProperty("JMSSERVER");

              private String jmsQueueName = System.getProperty("QUEUE");

              @BeforeClass
              public static void setUpBeforeClass() throws Exception
              {
              }

              @AfterClass
              public static void tearDownAfterClass() throws Exception
              {
              }

              @Before
              public void setUp() throws Exception
              {
                  log.info("setUp - cleaning up");
                  clearQueue();
              }

              @After
              public void tearDown() throws Exception
              {
                  log.info("tearDown - cleaning up");
                  clearQueue();
              }

              public void clearQueue() throws Exception
              {
                  boolean areThereMore = true;
                  while (areThereMore)
                  {
                      try
                      {
                          ObjectMessage message = null;
                          message = getNextMessage(jmsServer, jmsQueueName, false, 1, null);
                          if (message != null && message.getObject() != null)
                          {
                              log.debug("There's more");
                          }
                          else
                          {
                              log.debug("No more.");
                              areThereMore = false;
                          }
                      }
                      catch (Exception e)
                      {
                          log.error(e.getMessage(), e);
                      }
                  }
              }

              @Test
              public void testJMSQueueSucceeds() throws Exception
              {
                  ObjectMessage om = null;
                  try
                  {
                      // /////////////////////////////////////////////////////////////////////////
                      // Send a couple of messages
                      // /////////////////////////////////////////////////////////////////////////
                      log.info("--------------------------------------------------------------------------------");
                      log.info("sending two messages");
                      this.sendJMSMessage("test1");
                      this.sendJMSMessage("test2");
                      log.info("--------------------------------------------------------------------------------");
                      log.debug("Manually retrieving messages");
                      om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test1'");
                      assertTrue(om != null);
                      om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test2'");
                      assertTrue(om != null);
                  }
                  catch (Exception e)
                  {
                      log.error(e.getMessage(), e);
                  }
                  finally
                  {
                  }
                  log.debug("Done.");
              }

              @Test
              public void testJMSQueueFails() throws Exception
              {
                  ObjectMessage om = null;
                  try
                  {
                      // /////////////////////////////////////////////////////////////////////////
                      // Send a couple of messages
                      // /////////////////////////////////////////////////////////////////////////
                      log.info("--------------------------------------------------------------------------------");
                      log.info("sending two messages");
                      this.sendJMSMessage("test1");
                      this.sendJMSMessage("test1");
                      log.info("--------------------------------------------------------------------------------");
                      log.debug("Manually retrieving messages");
                      om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test1'");
                      assertTrue(om != null);
                      om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test1'");
                      assertTrue(om != null);
                  }
                  catch (Exception e)
                  {
                      log.error(e.getMessage(), e);
                  }
                  finally
                  {
                  }
                  log.debug("Done.");
              }

              private void sendJMSMessage(String senderID) throws Exception
              {
                  try
                  {
                      log.debug("Sending...");
                      String message = "This is a test";
                      Hashtable<String, Object> properties = new Hashtable<String, Object>();
                      if (senderID != null)
                      {
                          properties.put("senderID", senderID);
                          log.debug("senderID = " + senderID);
                      }
                      this.sendMessageX(jmsServer, jmsQueueName, true, (Serializable) message, 20, null, 0, properties, DeliveryMode.NON_PERSISTENT, 4);
                      log.debug("Message sent");
                  }
                  catch (Exception e)
                  {
                      log.error(e.getMessage(), e);
                  }
              }

              private void sendMessageX(String jmsServer, String jmsQueueName, boolean clientAcknowledge, Serializable message, int ttl, JMSQueue replyTo, int replyWaitSeconds, Map<String, Object> properties, int persistence, int priority) throws NamingException, JMSException
              {
                  InitialContext ctx = null;
                  QueueConnection conn = null;
                  QueueConnectionFactory tcf = null;
                  Queue queue = null;
                  QueueSession session = null;
                  MessageProducer producer = null;
                  try
                  {
                      Properties ctxProperties = new Properties();
                      ctxProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
                      ctxProperties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
                      ctxProperties.put(Context.PROVIDER_URL, jmsServer);
                      ctx = new InitialContext(ctxProperties);
                      tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
                      conn = tcf.createQueueConnection();
                      queue = (Queue) ctx.lookup(jmsQueueName);
                      conn.start();
                      if (clientAcknowledge)
                      {
                          session = conn.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
                      }
                      else
                      {
                          session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                      }
                      // ////////////////////////////
                      log.debug("Start sendMessage()");
                      producer = session.createProducer(queue);
                      ObjectMessage objMessage = session.createObjectMessage(message);
                      if (properties != null)
                      {
                          for (String key : properties.keySet())
                          {
                              objMessage.setObjectProperty(key, properties.get(key));
                          }
                      }
                      if (replyTo != null)
                      {
                          objMessage.setJMSReplyTo(replyTo.getJMSQueue());
                          objMessage.setStringProperty("replyToServer", replyTo.getJmsServer());
                          objMessage.setStringProperty("replyToQueue", replyTo.getJmsQueueName());
                      }
                      producer.setTimeToLive(ttl * 1000);
                      log.debug("-------------Sending Message");
                      producer.send(objMessage, persistence, priority, 0);
                      producer.close();
                      objMessage = null;
                      // //////////////////////////////
                  }
                  finally
                  {
                      try
                      {
                          conn.close();
                          conn = null;
                      }
                      catch (Exception e)
                      {
                          log.error(e.getMessage(), e);
                      }
                      try
                      {
                          session.close();
                          session = null;
                      }
                      catch (Exception e)
                      {
                          log.error(e.getMessage(), e);
                      }
                  }
              }

              private ObjectMessage getNextMessage(String jmsServer, String jmsQueueName, boolean clientAcknowledge, int queueWaitSeconds, String selector) throws NamingException, JMSException
              {
                  InitialContext ctx = null;
                  QueueConnection conn = null;
                  QueueConnectionFactory tcf = null;
                  Queue queue = null;
                  QueueSession session = null;
                  QueueReceiver consumer = null;
                  try
                  {
                      Properties properties = new Properties();
                      properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
                      properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
                      properties.put(Context.PROVIDER_URL, jmsServer);
                      ctx = new InitialContext(properties);
                      tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
                      conn = tcf.createQueueConnection();
                      queue = (Queue) ctx.lookup(jmsQueueName);
                      conn.start();
                      if (clientAcknowledge)
                      {
                          log.debug("CLIENT_ACKNOWLEDGE");
                          session = conn.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
                      }
                      else
                      {
                          log.debug("AUTO_ACKNOWLEDGE");
                          session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                      }
                      // ////////////////////////////
                      log.debug("Start getNextMessage()");
                      if (selector == null)
                      {
                          log.debug("Not using a selector");
                          consumer = session.createReceiver(queue);
                      }
                      else
                      {
                          log.debug("Using a selector");
                          consumer = session.createReceiver(queue, selector);
                      }
                      ObjectMessage message = (ObjectMessage) consumer.receive(queueWaitSeconds * 1000);
                      consumer.close();
                      if (message != null)
                      {
                          log.debug("----------Got a message");
                          return message;
                      }
                      else
                      {
                          log.debug("---No message found in " + queueWaitSeconds + " seconds");
                          return null;
                      }
                      // ////////////////////////////////
                  }
                  finally
                  {
                      try
                      {
                          conn.close();
                          conn = null;
                      }
                      catch (Exception e)
                      {
                          log.error(e.getMessage(), e);
                      }
                      try
                      {
                          session.close();
                          session = null;
                      }
                      catch (Exception e)
                      {
                          log.error(e.getMessage(), e);
                      }
                  }
              }
          }