1 2 Previous Next 27 Replies Latest reply on Aug 10, 2006 3:06 AM by davidrh Go to original post
      • 15. Re: Removing a MessageListener doesn't remove its associated
        davidrh

        I have enhanced my test harness again and have managed to reproduce the problem of the listeners dying. I have made the harness emulate pretty much exactly what we do in our application for receiving messages. Our application receives messages from multiple queues, and we have a manager thread for each queue which manages the process of connecting to the queue and setting up the right number of message listeners. In my example, I have set it up so that the first queue moves its messages to the second queue, the second queue to the third queue and the third queue moves them back to the first queue. In this way, it generates a constant flow of messages for the test.

        When I ran this, after an hour and a half I get the following output in the console:

        Checking core.internal.bulkUpload
        Checking core.internal.bulkUpload.error
        Checking billing.response
        Checking core.internal.publication
        Checking publish.request
        Checking billing.response.error
        Checking core.track.error
        Checking core.track
        ********** core.track.2 has stopped **********
        ********** core.track.3 has stopped **********
        ********** core.track.4 has stopped **********
        


        After four hours, some more stop:

        Checking core.internal.bulkUpload
        Checking billing.response
        Checking core.track
        ********** core.track.2 has stopped **********
        ********** core.track.3 has stopped **********
        ********** core.track.4 has stopped **********
        Checking billing.response.error
        Checking core.internal.publication
        ********** core.internal.publication.4 has stopped **********
        ********** core.internal.publication.6 has stopped **********
        ********** core.internal.publication.7 has stopped **********
        ********** core.internal.publication.8 has stopped **********
        ********** core.internal.publication.9 has stopped **********
        Checking core.track.error
        Checking publish.request
        


        If I go into the debugger in Eclipse and pause these threads, I get the following stack trace:

        Thread [core.track.4] (Suspended)
         Object.wait(long) line: not available [native method]
         Object.wait() line: 429
         LinkedQueue.take() line: not available
         QueuedExecutor$RunLoop.run() line: not available
         Thread.run() line: 534
        


        I've checked in the JMX console, and the core.track queue has over 9,000 messages waiting to be processed, so there is plenty for these threads to do. The core.internal.publication queue had 280 messages, so there still should be work for the listeners that have stopped.

        The new test harness is below. I've created the same number of listeners for all of our queues as we have in our application. These queues are configured in JBoss as follows:

        publish.request FullSize=10000 PageSize=2000 DownCacheSize=2000
        core.track FullSize=10000 PageSize=2000 DownCacheSize=2000
        core.internal.publication FullSize=10000 PageSize=2000 DownCacheSize=2000
        core.track.error FullSize=10000 PageSize=2000 DownCacheSize=2000
        billing.response FullSize=10000 PageSize=2000 DownCacheSize=2000
        billing.response.error FullSize=2000 PageSize=200 DownCacheSize=200
        core.internal.bulkUpload FullSize=100 PageSize=10 DownCacheSize=10
        core.internal.bulkUpload.error FullSize=100 PageSize=10 DownCacheSize=10

        A template of our queue configuration is as follows:

         <mbean code="org.jboss.jms.server.destination.Queue"
         name="jboss.messaging.destination:service=Queue,name=core.track"
         xmbean-dd="xmdesc/Queue-xmbean.xml">
         <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
         <attribute name="SecurityConfig">
         <security>
         <role name="guest" read="true" write="true"/>
         <role name="publisher" read="true" write="true" create="false"/>
         <role name="noacc" read="false" write="false" create="false"/>
         </security>
         </attribute>
         <attribute name="FullSize">10000</attribute>
         <attribute name="PageSize">2000</attribute>
         <attribute name="DownCacheSize">2000</attribute>
         </mbean>
        


        To run the harness, I start with a new instance of JBoss Messaging and seed the publish.request queue with lots (10,000+) of text messages. I then start the harness which simply moves the messages between the publish.request, core.internal.publication and core.track queues. I don't put any messages in the other queues, as they don't have messages in them when our application fails. I then let the harness run, until it displays the queue has stopped messages.

        public class TestMultiSessionMessageListener {
        
         public static void main(String[] args) {
         TestMultiSessionMessageListener ml = new TestMultiSessionMessageListener();
        
         try {
         ml.test();
         } catch (Exception e) {
         e.printStackTrace();
         }
         }
        
         private void test() throws Exception {
         boolean printMessages = false;
        
         createListeners("publish.request", "core.track", 5, 250, printMessages);
         Thread.sleep(2000);
         createListeners("core.track", "core.internal.publication", 5, 250, printMessages);
         Thread.sleep(2000);
         createListeners("core.internal.publication", "publish.request", 10, 750, printMessages);
         Thread.sleep(2000);
         createListeners("core.track.error", "core.track", 2, 100, printMessages);
         Thread.sleep(2000);
         createListeners("billing.response", "publish.request", 2, 100, printMessages);
         Thread.sleep(2000);
         createListeners("billing.response.error", "billing.response", 2, 100, printMessages);
         Thread.sleep(2000);
         createListeners("core.internal.bulkUpload", "publish.request", 3, 100, printMessages);
         Thread.sleep(2000);
         createListeners("core.internal.bulkUpload.error",
         "core.internal.bulkUpload", 1, 100, printMessages);
         Thread.sleep(Long.MAX_VALUE);
         }
        
         private void createListeners(final String receiveQueue,
         final String sendQueue, final int numberOfProcesses,
         final long delay, final boolean printMessages) throws Exception {
        
         Runnable runnable = new Runnable() {
        
         public void run() {
         Hashtable properties = new Hashtable();
         properties.put(Context.INITIAL_CONTEXT_FACTORY,
         "org.jnp.interfaces.NamingContextFactory");
         properties.put(Context.URL_PKG_PREFIXES,
         "org.jboss.naming:org.jnp.interfaces");
         properties.put(Context.PROVIDER_URL, "jnp://localhost:1099");
         properties.put(Context.SECURITY_PRINCIPAL, "admin");
         properties.put(Context.SECURITY_CREDENTIALS, "admin");
        
         ConnectionFactory connectionFactory = null;
        
         try {
         Context context = new InitialContext(properties);
         connectionFactory = (ConnectionFactory) context
         .lookup("ConnectionFactory");
         Connection connection = connectionFactory
         .createConnection();
         connection.start();
        
         MessageConsumer[] consumers = new MessageConsumer[numberOfProcesses];
         Session[] sessions = new Session[numberOfProcesses];
         MessageListener[] listeners = new MessageListener[numberOfProcesses];
         final long[] lastReceived = new long[numberOfProcesses];
        
         for (int j = 0; j < consumers.length; j++) {
         sessions[j] = connection.createSession(false,
         Session.AUTO_ACKNOWLEDGE);
         final Queue sourceQ = sessions[j]
         .createQueue(receiveQueue);
         final Queue destQ = sessions[j].createQueue(sendQueue);
         final Session session = sessions[j];
        
         consumers[j] = sessions[j].createConsumer(sourceQ);
         lastReceived[j] = 0;
         final int count = j;
        
         listeners[j] = new MessageListener() {
         public String threadName;
        
         private boolean threadNameChanged = false;
        
         public void onMessage(Message msg) {
         lastReceived[count] = System
         .currentTimeMillis();
         try {
         if (!threadNameChanged) {
         threadName = receiveQueue + "." + count;
         Thread.currentThread().setName(
         threadName);
         threadNameChanged = true;
         }
         String payload = ((TextMessage) msg)
         .getText();
         if (printMessages) {
         System.out.println(Thread
         .currentThread().getName()
         + " " + payload);
         }
         MessageProducer producer = session
         .createProducer(destQ);
         try {
         // Simulate normal processing time
         Thread.currentThread().sleep(delay);
         } catch (InterruptedException e) {
         e.printStackTrace();
         }
         producer.send(msg);
         producer.close();
         } catch (JMSException e) {
         e.printStackTrace();
         }
         }
         };
        
         consumers[j].setMessageListener(listeners[j]);
         }
         // Thread.currentThread().sleep(Long.MAX_VALUE);
         while (true) {
         Thread.currentThread().sleep(60000);
         System.out.println("Checking " + receiveQueue);
         long currentTime = System.currentTimeMillis();
         for (int j = 0; j < numberOfProcesses; j++) {
         if (lastReceived[j] > 0 && currentTime - lastReceived[j] > 60000) {
         System.out.println("********** " + receiveQueue + "."
         + j + " has stopped **********");
         }
         }
         }
         } catch (Exception e) {
         throw new RuntimeException(e);
         }
         }
         };
        
         Thread t = new Thread(runnable);
         t.setName("Manager " + receiveQueue);
         t.start();
         }
        }
        



        • 16. Re: Removing a MessageListener doesn't remove its associated
          timfox

          Hi David-

          I have looked into this issue, and I can confirm that I am seeing the same results as you: after an hour or so some of the listeners report as being "stopped".

          Looking further into this, I think this is actual correct behaviour.

          The following seems to be happening.

          Each queue has multiple sessions each with one listener on it. When there are multiple consumers on a queue then messages are always delivered to the first consumer on the queue unless it is busy, then they will go to the second, the third etc.

          Therefore if the consumers tend to process them faster than they arrive on the queue then the last consumer on the queue is unlikely to get any to process.

          I shall run the experiment overnight to make sure, but I don't expect the consumers to stop altogether (if they do that certainly is a problem).

          There is a task to provide different routing policies (so you can do round robin, random etc) for consumers, but this wouldn't give you any better throughput in your case.

          JBossMQ has a similar behaviour (see http://wiki.jboss.org/wiki/Wiki.jsp?page=JBossMQReceiverImpl)

          • 17. Re: Removing a MessageListener doesn't remove its associated
            davidrh

            Hi Tim,

            I think this would be the correct behaviour if there were not enough items on the queue for all threads to process. We've noticed that messaging has an affinity for the first thread when the load is light. In this case, though, we've checked the queue using the JMX console and there are lots of messages waiting to be processed on the queue that is reported as stopped. In fact, because some of the listeners have stopped the queue is actually starting to back up, as there are not as many threads working it any more.

            These stopped listeners never seem to recover - we've left them overnight in our application but they never come back.

            Just out of interest, how to you enable round robin dispatching to the listeners? This would make it easier for us to determine if every listener is doing its fair share, or whether some are slacking off.

            Thanks,

            David

            • 18. Re: Removing a MessageListener doesn't remove its associated
              timfox

              I've left it running now for about 12 hours on my machine, it reports:

              publish.request.4 has stopped
              core.track.4 has stopped

              So only two listeners have stoppped (same as after 1 hour)

              If I look at the queues in jmx-console, I have:

              publish.request: 527 messages
              core.track: 540 messages
              core.internal.publication: 11113 messages

              This makes sense, since core.internal.publications listeners are the slow ones - this seems to be pretty much a steady state.

              With these number of messages in publish.request and core.track It should easily be possible for one listener to get starved on the slow queues, since messages are prefetched in chunks (default chunk size of 150) to consumers.

              So I am happy with what I am seeing.

              Your code base is different to mine though and might account for the difference in behaviour.

              Right now the routing policy is not pluggable but there is a JIRA tasks for it.

              • 19. Re: Removing a MessageListener doesn't remove its associated
                davidrh

                If you manually throw another chunk of messages (say 1000) onto the publish.request and core.track queues, do they become unstuck? If it is the chunk size that is causing the problem, then when there are sufficient messages in the queue for everyone to get a chunk, then all listeners should start again. The test that I ran this morning, I had thousands of messages available in the stuck queues.

                As an aside, do you know when CR4 will be out, as we may be able to work around this by going back to stopping and starting the listeners periodically.

                Thanks,

                David

                • 20. Re: Removing a MessageListener doesn't remove its associated
                  davidrh

                  Just thinking about it, the chunking behaviour is not what we see when things are running normally. If we generate more load than the queue listeners can handle, the load is distributed evenly between the listeners, even when there are less than (150 * number of listeners) outstanding messages.

                  For example, we can have 5 listeners on the queue and 100 messages outstanding and they will be distributed evenly between the 5 listeners. It is only when the outstanding messages on the queue is less than 5 that we see affinity with the first listeners.

                  • 21. Re: Removing a MessageListener doesn't remove its associated
                    timfox

                     

                    "davidrh" wrote:
                    If you manually throw another chunk of messages (say 1000) onto the publish.request and core.track queues, do they become unstuck?


                    Good question.

                    I have sent 10000 more messages to each of those queues, and the listeners do indeed become unstuck :)



                    As an aside, do you know when CR4 will be out,


                    Ovidiu is cutting the release later today hopefully, so probably sometime end of Friday (US time).


                    as we may be able to work around this by going back to stopping and starting the listeners periodically.


                    You shouldn't need to do that, if there is a problem here we will fix it.


                    • 22. Re: Removing a MessageListener doesn't remove its associated
                      timfox

                      The prefetch (chunking) was implemented after CR3 so you don't have this.

                      My best recommendation now is to try CR4 early next week and see if this solves your problems.

                      • 23. Re: Removing a MessageListener doesn't remove its associated
                        timfox

                        Actually I'm tempted to implement the round robin routing policy and make it default so as to make the message distribution amongst multiple competing consumers a bit fairer in the case of batching.

                        • 24. Re: Removing a MessageListener doesn't remove its associated
                          davidrh

                          I'd vote for round robin. As you can probably gather from the simulated timings in our test harness, we use the queues to defer some expensive processing and throttle our application to produce dependable response times. Some of our processing averages around 200 - 250ms per message, so a chunk of 150 messages to one receiver represents around 30 - 40 seconds worth of processing. Round robin would allow us to spread this more reliably between the available receivers.

                          Having said that, it's not hugely important as we tune our number of listeners for average daily load. Peak load may be 10 times the average, but as long as we get through a days worth of messages in a day it doesn't matter much.

                          Is the chunk size configurable in CR4?

                          • 25. Re: Removing a MessageListener doesn't remove its associated
                            davidrh

                            Round robin will also make it more likely that the messages are consumed in close to the original sequence. If you allocate a chunk of 150 messages to the first receiver and the next 150 to the second receiver (I think this is what you mean by chunking?) then message 151 will be received at the same time as message 1. If you were in line at the bank with 150 people in front of you, you'd think it was your lucky day!

                            • 26. Re: Removing a MessageListener doesn't remove its associated
                              timfox

                              Agreed.

                              I have implemented a round robin strategy and I'm testing it now.

                              So far no consumers have starved (been running for 1.5 hours).

                              If this works ok I'll make it the default for queues and subscriptions in RC4.

                              • 27. Re: Removing a MessageListener doesn't remove its associated
                                davidrh

                                We've been running with CR4 for 6 hours now under load and so far so good. We've had no listeners stop receiving messages and we're getting a nice even distribution of messages across the listeners.

                                Thanks for your help Tim in getting this resolved.

                                David

                                1 2 Previous Next