13 Replies Latest reply on Oct 18, 2006 9:48 AM by timfox

    Messages Stuck in Queue

    guinsu

      I'm trying to switch from SwiftMQ over to JBoss Messaging. I've been having a problem where messages get stuck in a queue whose consumers are using selectors. I have read about the issues involved with selectors and speed, however this is not the case here. I have 4 consumers with a selector like: (serverID="+serverID+" OR serverID IS NULL). Messages with the server Id set will get stuck in the queue until I restart the consumer it was sent to. Stopping the producer has no effect, even if I let the app and the JMS server sit idle it will never pull the message out. While it is very random, most messages with server id set go through. It happens very quickly after starting my app up, usually within minutes. The client is not deadlock, nor did the receiving thread die. Using the Jconsole I can see the thread waiting on a jms.receive(). Any idea whats going on? I do not have a small sample program that shows this behavior but I can provide whatever relevant config info is needed.

        • 1. Re: Messages Stuck in Queue
          clebert.suconic

          I will try to create a testcase.

          But can you provide more information?

          Can you provide a code snipped of your selector?

          • 2. Re: Messages Stuck in Queue
            guinsu

            Ok, I tried a test class and everything worked fine. While writing it I did run into a problem with MessageConsumer.receive(0). My old JMS server would block indefinitely until there was something to receive. It did not appear as if JBoss was doing this and I switched my test code to a MessageListener. This could be my problem, I will change the production code and see.

            • 3. Re: Messages Stuck in Queue
              guinsu

              Well, after messing with my program and my test case I still can't make this happen in isolation. It seems like my MessageConsumer stops receiving any messages where the server Id is not null. The consumer isn't getting closed since it still receives the rest of its messages. But I have to restart my app to get those missed messages. I've tried swtching to a message listener and going over all my code, nothing is really jumping out at me.

              • 4. Re: Messages Stuck in Queue
                guinsu

                Ok, I figured it out, not sure if this was stupidity on my part or just me expecting the system to behave a certain way.

                Basically I am using JMS with an indexing service and each client switched between two selectors, one looked like : (serverID="+serverID+" OR serverID IS NULL) and the other: (serverID="+serverID). My problem was both were open at the same time, so even though I wasn't receiving on the 2nd one, messages still got stuck. I do not know if this is correct behavior or not, but now I close and open each receiver when I am switching back and forth.

                • 5. Re: Messages Stuck in Queue
                  clebert.suconic

                  Messageconsumer is associated with your Session, so.. the behavior you were getting is correct. (I mean.. it was correct that you got an error :-) )


                  anyway... By the time I was coming to write a post I saw your message.

                  I was going to post the testcase that I got, and didn't get any failure. Just for reference now (as you got your problem solved), I will paste the testcase here:

                  Cheers,

                  Clebert

                   public void testSelectiveOnServerId() throws Exception
                   {
                   Connection conn = cf.createConnection();
                   conn.start();
                  
                   Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                   MessageProducer prod = session.createProducer(queue);
                   prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                  
                   int serverId=((ConnectionState)(((ClientConnectionDelegate)((JBossConnection)conn).getDelegate()).getState())).getServerID();
                  
                   String selector = "serverID = " + serverId;
                   MessageConsumer serverConsumer = session.createConsumer(queue, selector);
                   conn.start();
                  
                   Message redMessage = session.createMessage();
                   redMessage.setStringProperty("color", "red");
                  
                   Message blueMessage = session.createMessage();
                   blueMessage.setStringProperty("color", "blue");
                   blueMessage.setIntProperty("serverID",serverId);
                  
                   prod.send(redMessage);
                   prod.send(blueMessage);
                  
                   Message rec = serverConsumer.receive(3000);
                   assertNotNull(rec);
                   assertNull(serverConsumer.receive(1000));
                   assertEquals(blueMessage.getJMSMessageID(), rec.getJMSMessageID());
                   assertEquals("blue", rec.getStringProperty("color"));
                  
                  
                   serverConsumer.close();
                  
                   session.close();
                   }
                  
                  


                  • 6. Re: Messages Stuck in Queue
                    guinsu

                    Hmm...I thought I had it solved, but then I came in this morning to having 96,000 items stuck in the queue. Stopping and restarting my client apps isn't pulling anything out. Also, using the admin "list messages" option isn't showing any messages, either with selectors like "serverID is null" or "serverID =1" or with no selector. Are there cases where list messages fails? I'll have to write a quick test with no selector to see what happens when I try to drain off those stuck messages. First I will stop and restart JBoss.

                    • 7. Re: Messages Stuck in Queue
                      timfox

                      Can you explain what you are doing and what you are seeing in more detail?

                      I am currently trying (and failing) to understand what your setup is :)

                      When you say messages "stuck in queue" - do you mean "stuck in the consumer"?

                      If you could just walk through in detail what you are doing I will try and help you out.

                      • 8. Re: Messages Stuck in Queue
                        guinsu

                        Sure, let me try to explain it better. I'm using JMS as part of an indexing system, so items are processed and put in a queue for indexing. If an item has never been indexed before the serverID property of the messages is set to null, because I don't care which server it ends up on, just that it goes somewhere. If an item has been previously indexed but it was updated then the serverID field is set so the correct server will grab the item and update it. The indexers are set up with a selector to get either items with the id set to their server ID or to null.

                        When I say "stuck" I mean I go into the web admin for JBoss, go to jboss.messaging.destination, then the queue I am looking at. MessageCount is a non-zero number when the processing side of the app is idle. The indexing side is still running and waiting on messageconsumer.receiver(0), however the JMS server never sends any of these messages. Before, stopping and restarting the indexing app would kick old messages back out, but then once I restarted procoessing side more would get stuck. Now though the stuck messages aren't coming out, and JBoss gives out of memory errors when it starts up, so it looks like I need to figure out how to totally delete the queue and start over.

                        • 9. Re: Messages Stuck in Queue
                          timfox

                          Do you have more than one message consumer on each session?

                          • 10. Re: Messages Stuck in Queue
                            timfox

                            Actually if you could post the relevant parts of your consumption code that would be useful

                            • 11. Re: Messages Stuck in Queue
                              guinsu

                              At one point I did have 2 consumers on a session, but since my post yesterday I only have 1.

                              I'll try to post the code, but I have written some wrappers around things so it might be a mess to post...

                              • 12. Re: Messages Stuck in Queue
                                guinsu

                                Ok, I've been running various tests and it seems that once items get stuck in the jboss queue the only way to fix it is to stop and restart the queue.
                                When items got stuck, I shut down all my producers and consumers and ran the following program, it never received a single item from the queue. I have also switch my back end database from the default HSQL to MySQL, JBoss behaved the same way with both DBs. Also, I have noticed the queue size on the stuck queue would be wildly innaccurate at various times. From these problems and the fact that this simple program receives nothing the problem appears to be in JBoss.

                                import javax.jms.*;
                                
                                import java.util.concurrent.atomic.AtomicBoolean;
                                import javax.naming.Context;
                                import javax.naming.InitialContext;
                                import org.apache.log4j.Logger;
                                import org.apache.log4j.PropertyConfigurator;
                                
                                public class JMSDrainTest
                                {
                                 private static final Logger log = Logger.getLogger(JMSDrainTest.class.getName());
                                
                                 /**
                                 * Starts up many send and recieve threads.
                                 *
                                 * @param args a String[]
                                 *
                                 */
                                 public static void main(String[] args)
                                 {
                                 System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.SimpleLog");
                                 System.setProperty("org.apache.commons.logging.simplelog.showdatetime", "true");
                                
                                 String log_file=args[0];
                                 try
                                 {
                                 PropertyConfigurator.configure(JMSLargeFileTest.class.getResource("/"+log_file));
                                 AtomicBoolean done = new AtomicBoolean(false);
                                 String clientID = "IndexDrainer-"+System.currentTimeMillis();
                                 Connection connection;
                                 Context ic;
                                 ic = new InitialContext();
                                 log.info("Looking up ConnectionFactory");
                                 ConnectionFactory connectionFactory = (ConnectionFactory) ic.lookup("/ConnectionFactory");
                                 clientID = clientID.replaceAll("\\.", "-");
                                
                                 log.info("Creating connection");
                                 connection = connectionFactory.createConnection();
                                 connection.setClientID(clientID);
                                 log.info("Starting connection");
                                 connection.start();
                                
                                 String name="/queue/Index";
                                 log.info("Creating session");
                                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                                 log.info("Looking up destination");
                                 Destination destination = (Destination)ic.lookup(name);
                                 log.info("Creating consumer");
                                 MessageConsumer consumer = session.createConsumer(destination, null, false);
                                
                                 log.info("Start Receive");
                                 while(!done.get())
                                 {
                                 consumer.receive();
                                 log.info("Receiving");
                                 }
                                 }
                                 catch (JMSException e)
                                 {
                                 log.error(e.toString(), e);
                                 }
                                 catch (Exception e)
                                 {
                                 log.error(e.toString(), e);
                                 }
                                 catch (Error e)
                                 {
                                 log.error(e.toString(), e);
                                 }
                                
                                 }
                                }
                                
                                


                                Output:

                                18 Oct 2006 09:38:03 INFO : Looking up ConnectionFactory
                                18 Oct 2006 09:38:04 INFO : Creating connection
                                18 Oct 2006 09:38:05 INFO : Starting connection
                                18 Oct 2006 09:38:05 INFO : Creating session
                                18 Oct 2006 09:38:06 INFO : Looking up destination
                                18 Oct 2006 09:38:06 INFO : Creating consumer
                                18 Oct 2006 09:38:06 INFO : Start Receive

                                Then it just sits.

                                • 13. Re: Messages Stuck in Queue
                                  timfox

                                  Please can you complete your test case with the code that actually adds the messages to the queue.

                                  Run by itself this code will "stall" since there will be no messages in the queue.