1 2 Previous Next 21 Replies Latest reply on May 17, 2011 10:25 PM by clebert.suconic

    Losing messages on the subscriber when filtering

    java1970

      Here is a simple load test program to test the creation of 10 subscribers and one publisher that publishes 5000 messages.

      When ran using an argument it uses a selector when creating the subscribers and when publishing the message.

      The test passes when I just run one process at a time.  (ten subscribers and one publisher on the topic using one filter)

       

      The trouble happens when I run >=two of these processes with different command line arguments (i.e. a different filter for each running process)

      Let's say I run this process twice.  Once with a command line argument of 1 and the other with a command line argument of 2

      This will create one process with 10 subscribers and publisher publishing 5000 messages - both using a filter of 1

      This will also create another process with 10 subscribers and a publisher publishing 5000 messages - both using a filter of 2

      When looking at the output of the two processes, one of the processes will usually not pick up the expected amount of published messages while the other will pick up more than what was expected.

       

      Following is the code.  Maybe hornetQ is ok and there is just something incorrect with my test.

       

      Hope it was ok to just paste it in here seeing as it's not too large.

      If a zip file is preferred I can do that.

       

      I have HornetQ 2.2.2 final integrated with AS 5

      Let me know if there is any other information you need.

       

      Thanks

       

      *******************************************************************************************************************************************

       

      {code}

      import java.util.Properties;

      import java.util.Vector;

      import java.util.logging.Level;

      import java.util.logging.Logger;

      import javax.jms.ConnectionFactory;

      import javax.jms.ExceptionListener;

      import javax.jms.JMSException;

      import javax.jms.Message;

      import javax.jms.MessageListener;

      import javax.jms.ObjectMessage;

      import javax.jms.Topic;

      import javax.jms.TopicConnection;

      import javax.jms.TopicPublisher;

      import javax.jms.TopicSession;

      import javax.jms.TopicSubscriber;

      import javax.naming.InitialContext;

      import javax.naming.NamingException;

       

      public class BasicJMSLoadTest

      {

          private static Logger log = Logger.getLogger(BasicJMSLoadTest.class.getName());

       

      // parameters that are populated from VM parameters

          private static int numSubscribers, numMessages, messageSize, publishSleep, onMessageSleep, waitSleep;

          private static String topicName;

          private static String namingHost;

          private static int namingPort;

          private static boolean runOutOfMemory = false;

       

          private static InitialContext initialContext;

          private static ConnectionFactory connectionFactory;

          private static TopicConnection[] connections;

          private static TopicSession[] sessions;

          private static TopicSubscriber[] subscribers;

          private static CountingMessageListener[] listeners;

          private static String clientId = null;

          private static Vector memoryUsingVector = new Vector();

       

          private static int publishCount, publishRetries, receiveCount = -1;

          private static Throwable publishException, otherException;

       

          public static void main(String[] args) throws Throwable

          {

              try

              {

                  if (args.length == 1)

                      clientId = args[0];

                  log.info("Started with clientId: " + clientId);

                  initializeParameters();

                  initializeConnection();

                  log.info("Creating subscribers");

                  createSubscribers();

                  log.info("Starting publish");

                  publishMessagesInThread();

                  long start = System.currentTimeMillis();

                  log.info("Waiting for finish...");

                  waitForFinish();

                  if (publishRetries > 0 && publishException == null)

                      publishException = new Exception("There were " + publishRetries + " publish retries.");

                  if (otherException != null)

                      log.info("Received exception: " + otherException);

                  if (publishException != null)

                      log.info("Received publish exception: " + publishException);

                  log.info("Finished in " + (System.currentTimeMillis() - start) + " ms... Closing connections...");

                  closeResources();

                  log.info("Connections closed.");

                  log.info("Publish: " + publishCount + " Rec: " + receiveCount + " Retries: " + publishRetries);

              }

              catch (Throwable ex)

              {

                  log.log(Level.SEVERE, "Exception", ex);

                  otherException = ex;

              }

              if (otherException != null || publishException != null)

              {

                  log.info("Exception received.  Press Enter to quit.");

                  System.in.read();

              }

          }

       

          private static void createSubscribers() throws Exception

          {

              connections = new TopicConnection[numSubscribers];

              sessions = new TopicSession[numSubscribers];

              subscribers = new TopicSubscriber[numSubscribers];

              listeners = new CountingMessageListener[numSubscribers];

       

              Topic topic = (Topic)initialContext.lookup("topic/" + topicName);

       

              for (int i = 0; i < numSubscribers; i++)

              {

                  connections[i] = (TopicConnection)connectionFactory.createConnection();

                  connections[i].setExceptionListener(new LocalExceptionListener(i));

                  sessions[i] = connections[i].createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

                  if (clientId == null)

                      subscribers[i] = sessions[i].createSubscriber(topic);

                  else

                      subscribers[i] = sessions[i].createSubscriber(topic, "clientId = '" + clientId + "'", false);

                  listeners[i] = new CountingMessageListener();

                  subscribers[i].setMessageListener(listeners[i]);

                  connections[i].start();

              }

          }

       

          private static void publishMessagesInThread()

          {

              Thread t = new Thread(new Runnable()

              {

                  public void run()

                  {

                      try

                      {

                          Topic topic = (Topic)initialContext.lookup("topic/" + topicName);

                          TopicConnection c = (TopicConnection)connectionFactory.createConnection();

                          TopicSession s = c.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

                          TopicPublisher p = s.createPublisher(topic);

                          for (int i = 0; i < numMessages; i++)

                          {

                              int retryCount = 0;

                              boolean success = false;

                              // we get a MessagingNetworkFailureException sometimes.  Retrying the publish appears to work sometimes

                              while (!success && retryCount < 3)

                              {

                                  try

                                  {

                                      ObjectMessage message = s.createObjectMessage(new byte[messageSize]);

                                      if (clientId != null)

                                          message.setStringProperty("clientId", clientId);

                                      p.publish(topic, message);

                                      publishCount++;

                                      sleep(publishSleep);

                                      success = true;

                                  }

                                  catch (Throwable ex)

                                  {

                                      log.log(Level.SEVERE, "Received publish exception - will retry", ex);

                                      retryCount++;

                                      publishRetries++;

                                      if (publishRetries > 25) // if we have to retry more than 25 times total, stop the test

                                          throw ex;

                                  }

                              }

                          }

                          p.close();

                          s.close();

                          c.close();

                      }

                      catch (Throwable ex)

                      {

                          log.log(Level.SEVERE, "Error during publish", ex);

                          publishException = ex;

                      }

                  }

              });

              t.start();

          }

       

          private static void waitForFinish()

          {

              int totalReceived = 0, noMessagesReceivedCount = 0;

              do

              {

                  totalReceived = 0;

                  for (int i = 0; i < numSubscribers; i++)

                      totalReceived += listeners[i].messagesReceived;

                  log.info("Published: " + publishCount + " Received: " + totalReceived);

                  // if we haven't received any new messages, stop waiting for new messages and record an exception

                  if (totalReceived == receiveCount)

                      noMessagesReceivedCount++;

                  else

                      noMessagesReceivedCount = 0;

                  sleep(waitSleep);

                  if (noMessagesReceivedCount == 10)

                      otherException = new Exception("No messages received in timeout.");

                  receiveCount = totalReceived;

              } while (totalReceived < numMessages * numSubscribers &&

                  publishException == null && noMessagesReceivedCount < 10);

          if (totalReceived > numMessages * numSubscribers)

              otherException = new Exception("Received more messages than expected (" + totalReceived + ")");

          }

       

          private static void sleep(int millis)

          {

              if (millis > 0)

                  try

                  {

                      Thread.sleep(millis);

                  }

                  catch (Exception ex)

                  {

                  }

          }

       

       

          private static void initializeConnection() throws NamingException

          {

              log.info("Getting initial context [" + namingHost + ":" + namingPort + "]");

              Properties prop = new Properties();

              prop.setProperty(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");

              prop.setProperty(InitialContext.PROVIDER_URL, namingHost + ":" + namingPort);

              initialContext = new InitialContext(prop);

              log.info("Initial context created, looking up ConnectionFactory.");

              connectionFactory = (ConnectionFactory)initialContext.lookup("ConnectionFactory");

              log.info("ConnectionFactory looked up.");

          }

       

          private static void closeResources() throws Exception

          {

              for (int i = 0; i < numSubscribers; i++)

              {

                  subscribers[i].close();

                  sessions[i].close();

                  connections[i].close();

              }

          }

       

          private static void initializeParameters()

          {

              messageSize = Integer.parseInt(System.getProperty("messageSize", "4096"));

              publishSleep = Integer.parseInt(System.getProperty("publishSleep", "10"));

              waitSleep = Integer.parseInt(System.getProperty("waitSleep", "5000"));

              onMessageSleep = Integer.parseInt(System.getProperty("onMessageSleep", "10"));

              numMessages = Integer.parseInt(System.getProperty("numMessages", "5000"));

              numSubscribers = Integer.parseInt(System.getProperty("numSubscribers", "10"));

              topicName = System.getProperty("topicName", "jmsLoadTest");

              namingHost = System.getProperty("namingHost", "localhost");

              namingPort = Integer.parseInt(System.getProperty("namingPort", "1099"));

              runOutOfMemory = Boolean.parseBoolean(System.getProperty("runOutOfMemory", "false"));

          }

       

          public static class CountingMessageListener implements MessageListener

          {

              public int messagesReceived;

       

              public void onMessage(Message message)

              {

                  try

                  {

                      messagesReceived++;

                      if (runOutOfMemory)

                          memoryUsingVector.add(new byte[512000]);

                      sleep(onMessageSleep);

                  }

                  catch (Throwable ex)

                  {

                      log.log(Level.WARNING, "Exception in onMessage", ex);

                  }

              }

          }

       

          private static class LocalExceptionListener implements ExceptionListener

          {

              private int subscriberIndex;

       

              public LocalExceptionListener(int subscriberIndex)

              {

                  this.subscriberIndex = subscriberIndex;

              }

       

              public void onException(JMSException jmsException)

              {

                  otherException = jmsException;

                  log.severe("Subscriber " + subscriberIndex + " received asynchronous exception " + jmsException);

              }

          }

      }

       

      {code}

        • 1. Re: Loosing messages on the subscriber when filtering
          java1970

          I've added some more debug code in the onMessage method to the subscribers listener to check if the clientId value of the subscribers message selector does not equal that of the clientId in the received message.

           

          The mismatch seems to happen quite a bit.

          • 2. Re: Loosing messages on the subscriber when filtering
            ataylor

            if you could provide something runnable that would be great, i.e. zip with source and build/run script or even better a junit test then someone will take a look

            • 3. Re: Loosing messages on the subscriber when filtering
              java1970

              Hopefully the attachment works.

               

               

               

              This zip contains two java files (and their corresponding class files)

               

              BasicJMSLoadTest.java (standard Topic publisher/subscriber class with limited logging)

               

              BJLT2.java (More logging and modified to try producer/consumer)

               

               

               

              You will most likely need to modify the build.bat (For BasicJMSLoadTest.java) and build2.bat (For BJLT2.java) to use the correct classpath jars.

               

               

               

              If you run bs2.bat, it will start off two BJLT2 processes with different client ids (which is when the problem occurs).

               

              Running just one of these processes produces no errors.

               

               

               

              We are using HornetQ 2.2.2 final and EAP 5.1

              The program can use two vmparams to point to the correct AS naming port

              -DnamingHost

              -DnamingPort

               

               

               

              The topic, as defined in hornetq-jms.xml is (hopefully this shows up correctly)

               

               

              <topic name="jmsLoadTest">

                 <entry name="/topic/jmsLoadTest" />

              </topic>

               

               

               

               

              Thanks

               

               

               

               

              • 4. Loosing messages on the subscriber when filtering
                java1970

                I started looking at the TopicSelectorExample2.java example since it basically did what my previous test did.

                The only thing I modified was to comment out the 'all' consumer and to put the publishing of the three messages in a loop of 10 (so it sends 30 messages instead of 3)

                 

                I then wrote a batch script that would basically kick off 50 runs of the test process.

                As you can see in the screenshot below, even this causes a consumer with a message selector to get an unexpected message

                (Below - the red consumer receives a blue message)

                Sometimes all 50 processes would run to success, other times a handful of the 50 would receive a message not meant for them.

                 

                jms.gif

                • 5. Loosing messages on the subscriber when filtering
                  clebert.suconic

                  Can you place a diff of your changes to the TopicExampleWithSelector, just to make sure we follow the exact steps you taken.

                   

                  Are you sure you are creating a message each time you send? (Case you are doing asynchronous sends).

                   

                   

                  Just wanted to make sure about this, and I will take a look.

                  • 6. Loosing messages on the subscriber when filtering
                    clebert.suconic

                    If you're sure you're not sending the same message over and over.. can you open a JIRA? We will take a look shortly.

                    • 7. Re: Loosing messages on the subscriber when filtering
                      java1970

                      Before I open up a JIRA I will attach the HornetQ TestSelectorExample2 test with minimal changes

                      I've modified the HornetQExample.java and the TestSelectorExample2.java files.

                      I've marked my changes in the code with *DIFF*

                       

                      Everything should be set to run against a localhost HornetQ server for port 1099

                       

                      After unzipping you should be able to run the batch file: run50.bat

                      You should hopefully see the error I'm referring to.

                      When you run, say run50.bat, it will open 50 windows which will each in turn call the run.bat program (this will just run the single load test)

                      If the individual test is good, the window will automatically close.

                      If the individual test fails, it will remain open and you can scroll back through the STDOUT to see where I 'highlight' the error.

                       

                      A failure doesn't always happen, however, but I would say 7 out of 10 times it will.

                       

                      I've also created a directory called hornetConfig.  This directory contains the standalone configuration files that I'm using with my server.

                      In my previous posts I mentioned this test failing with HornetQ 2.2.2.Final and EAP 5

                      I'm also able to get the failed test with the standalone HQ server.

                       

                      If you still need me to open up a ticket I will

                       

                      Thanks

                      • 8. Re: Loosing messages on the subscriber when filtering
                        clebert.suconic

                        I'm more interested in your test in top of the example since it will be much simpler to debug it. Can you please attach it here?

                        • 9. Re: Loosing messages on the subscriber when filtering
                          java1970

                          Just run3.bat


                          Log files are also created now.

                          producer_X.txt (Shows all of the messages the producer is putting on the topic)

                          subscriber_X.txt (Shows each message the subscriber is consuming)

                          X is the client id value, which is also the message selector filter value


                          Let me know if you need anything else.


                          Thanks for your time.

                          • 10. Losing messages on the subscriber when filtering
                            java1970

                            Not sure if anyone has had time to look into this.

                            Would you like me to create a JIRA?

                            • 11. Losing messages on the subscriber when filtering
                              clebert.suconic

                              I will take a look on this... I'm just busy with another issue as of right now...

                               

                               

                              You can open a jira if you like.

                              • 12. Losing messages on the subscriber when filtering
                                java1970

                                Thanks.

                                I've been looking at the code myself to see if I can figure anything out.

                                 

                                I'm trying to figure out the place where the logic occurs to look at a message, see if it has any filters and then only send it to the consumer address with a matching filter.

                                 

                                I've been looking at QueueImpl and PostOfficeImpl

                                Are these the correct classes to start looking at, or is the filtering done elsewhere?

                                 

                                Thanks

                                • 13. Losing messages on the subscriber when filtering
                                  clebert.suconic

                                  Take a look for the matches of FilterImpl.match.

                                   

                                   

                                  BTW: in one of your posts here you said that you had modified the TopicExample with selector to replicate this issue, but you never posted it.

                                   

                                  Asking again: Can I have that class? What changes did you actually make?

                                  • 14. Losing messages on the subscriber when filtering
                                    java1970

                                    Thanks for the class name.

                                     

                                    I thought I did attach that code in post #7.

                                    It's included in the zip load_test_hornetq_testselector.zip (I probably should have named it better)

                                    I modified the TopicExampleSelector2.java file and marked my changes with a *DIFF* comment.

                                     

                                     

                                    This is what you're looking for, right?  Or am I missing something?

                                    1 2 Previous Next