7 Replies Latest reply on Sep 11, 2006 8:34 PM by ovidiu.feodorov

    Closing Consumer Causes Hang

    toddjtidwell

      So I've run into an odd problem the last couple of days. I've been through my code and I've re-installed JBoss and Messaging several times to try and eliminate it. I can't find a solid cause.

      Here's what's happening: We have a non-durable consumer that is listening to a topic. When it receives a messages with a specific int property, it knows it's supposed to stop listening and shut itself down.

      It establishes it's connections at follows:

      InitialContext initialContext=new InitialContext();
      
      ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("XAConnectionFactory");
      
      Connection connection=connectionFactory.createConnection();
      
      Session session=connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
      
      Destination destination=(Destination) initialContext.lookup("OurTopic");
      
      Consumer consumer=consumer=session.createConsumer(destination, "selectorProp = 1");
      
      consumer.setMessageListener(this);
      
      connection.start();
      


      Then for the onMessage method we have

      public void onMessage(Message message)
      {
       try
       {
      
       message.acknowledge();
      
       Integer closeProp=message.getIntProperty("closeProp");
      
       if (closeProp == -1)
       {
       try { connection.stop(); } catch (Exception e) {}
       try { consumer.close(); } catch (Exception e) {}
       try { session.close(); } catch (Exception e) {}
       try { connection.close(); } catch (Exception e) {}
       }
       }
       catch (JMSException e)
       {
       e.printStackTrace(System.err);
       }
      }
      


      Now, here's the odd thing: It just hangs on the consumer.close() (in bold) call. This used to work. I'm not sure what's changed, I've gone back over it 50 times now and can't find anyhting I changed that would have broken it.

      I've found that if I close everything from outside the onMessage method from another thread (like a timer thread or something that just watches the state of the consumer) it works fine. It's just when I close it all down in onMessage.

      Is this frowned upon or should this work? Any thoughts on what might be causing it to lock?





        • 1. Re: Closing Consumer Causes Hang
          clebert.suconic

          Can you place a stack trace from a thread dump?

          (kill -3 if you are on linux or Ctrl-break if Windows)

          • 2. Re: Closing Consumer Causes Hang
            toddjtidwell

            Sure, here ya go... My objects are the jms.test package stuff.

            Full thread dump Java HotSpot(TM) Client VM (1.5.0_07-b03 mixed mode, sharing):
            
            "Thread-4" prio=6 tid=0x0301dc30 nid=0x148 in Object.wait() [0x034ef000..0x034efa68]
             at java.lang.Object.wait(Native Method)
             - waiting on <0x22b99a38> (a org.jboss.messaging.util.Future)
             at java.lang.Object.wait(Unknown Source)
             at org.jboss.messaging.util.Future.getResult(Future.java:51)
             - locked <0x22b99a38> (a org.jboss.messaging.util.Future)
             at org.jboss.jms.client.remoting.MessageCallbackHandler.waitForOnMessageToComplete(MessageCallbackHandler.java:357)
             at org.jboss.jms.client.remoting.MessageCallbackHandler.close(MessageCallbackHandler.java:312)
             at org.jboss.jms.client.container.ConsumerAspect.handleClosing(ConsumerAspect.java:116)
             at org.jboss.aop.advice.org.jboss.jms.client.container.ConsumerAspect24.invoke(ConsumerAspect24.java)
             at org.jboss.jms.client.delegate.ClientConsumerDelegate$closing_4945873952494833124.invokeNext(ClientConsumerDelegate$closing_4945873952494833124.java)
             at org.jboss.jms.client.container.ClosedInterceptor.invoke(ClosedInterceptor.java:134)
             at org.jboss.aop.advice.PerInstanceInterceptor.invoke(PerInstanceInterceptor.java:117)
             at org.jboss.jms.client.delegate.ClientConsumerDelegate$closing_4945873952494833124.invokeNext(ClientConsumerDelegate$closing_4945873952494833124.java)
             at org.jboss.jms.client.container.ExceptionInterceptor.invoke(ExceptionInterceptor.java:69)
             at org.jboss.jms.client.delegate.ClientConsumerDelegate$closing_4945873952494833124.invokeNext(ClientConsumerDelegate$closing_4945873952494833124.java)
             at org.jboss.jms.client.container.ClientLogInterceptor.invoke(ClientLogInterceptor.java:107)
             at org.jboss.jms.client.delegate.ClientConsumerDelegate$closing_4945873952494833124.invokeNext(ClientConsumerDelegate$closing_4945873952494833124.java)
             at org.jboss.jms.client.delegate.ClientConsumerDelegate.closing(ClientConsumerDelegate.java)
             at org.jboss.jms.client.JBossMessageConsumer.close(JBossMessageConsumer.java:96)
             at messaging.DestinationHandler.disconnect(DestinationHandler.java:233)
             at jms.test.TestConsumer.closeConnection(TestConsumer.java:58)
             at jms.test.TestConsumer.onMessage(TestConsumer.java:85)
             at org.jboss.jms.client.remoting.MessageCallbackHandler.callOnMessage(MessageCallbackHandler.java:87)
             at org.jboss.jms.client.remoting.MessageCallbackHandler$ListenerRunner.run(MessageCallbackHandler.java:712)
             at EDU.oswego.cs.dl.util.concurrent.QueuedExecutor$RunLoop.run(QueuedExecutor.java:89)
             at java.lang.Thread.run(Unknown Source)
            
            "SocketServerInvokerThread-10.1.3.102-0" prio=6 tid=0x00aabe48 nid=0xc78 runnable [0x034af000..0x034afae8]
             at java.net.SocketInputStream.socketRead0(Native Method)
             at java.net.SocketInputStream.read(Unknown Source)
             at java.io.BufferedInputStream.fill(Unknown Source)
             at java.io.BufferedInputStream.read(Unknown Source)
             - locked <0x22bf3d68> (a java.io.BufferedInputStream)
             at java.io.FilterInputStream.read(Unknown Source)
             at org.jboss.serial.io.JBossObjectInputStream.read(JBossObjectInputStream.java:193)
             at org.jboss.remoting.transport.socket.ServerThread.readVersion(ServerThread.java:497)
             at org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:414)
             at org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:534)
             at org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:257)
            
            "DestroyJavaVM" prio=6 tid=0x00035038 nid=0xbdc waiting on condition [0x00000000..0x0007fae8]
            
            "Thread-3" prio=6 tid=0x0301cb28 nid=0x194 waiting on condition [0x0345f000..0x0345fb68]
             at java.lang.Thread.sleep(Native Method)
             at jms.test.PingThread.run(PingThread.java:33)
            
            "GC Daemon" daemon prio=2 tid=0x03014538 nid=0xc94 in Object.wait() [0x033df000..0x033dfbe8]
             at java.lang.Object.wait(Native Method)
             - waiting on <0x230ca078> (a sun.misc.GC$LatencyLock)
             at sun.misc.GC$Daemon.run(Unknown Source)
             - locked <0x230ca078> (a sun.misc.GC$LatencyLock)
            
            "RMI RenewClean-[10.1.3.102:1098]" daemon prio=6 tid=0x03001e18 nid=0xe48 in Object.wait() [0x0339f000..0x0339fce8]
             at java.lang.Object.wait(Native Method)
             - waiting on <0x2323caa8> (a java.lang.ref.ReferenceQueue$Lock)
             at java.lang.ref.ReferenceQueue.remove(Unknown Source)
             - locked <0x2323caa8> (a java.lang.ref.ReferenceQueue$Lock)
             at sun.rmi.transport.DGCClient$EndpointEntry$RenewCleanThread.run(Unknown Source)
             at java.lang.Thread.run(Unknown Source)
            
            "Thread-2" prio=6 tid=0x02dd0bf8 nid=0xff8 waiting on condition [0x0335f000..0x0335fd68]
             at java.lang.Thread.sleep(Native Method)
             at org.jboss.remoting.transport.socket.SocketServerInvoker$ServerSocketRefresh.run(SocketServerInvoker.java:590)
            
            "Timer-0" daemon prio=6 tid=0x02d770c8 nid=0x478 in Object.wait() [0x02edf000..0x02edf9e8]
             at java.lang.Object.wait(Native Method)
             - waiting on <0x231be7f8> (a java.util.TaskQueue)
             at java.util.TimerThread.mainLoop(Unknown Source)
             - locked <0x231be7f8> (a java.util.TaskQueue)
             at java.util.TimerThread.run(Unknown Source)
            
            "SocketServerInvoker#0-3416" prio=6 tid=0x00acf7d8 nid=0x408 runnable [0x02e9f000..0x02e9fa68]
             at java.net.PlainSocketImpl.socketAccept(Native Method)
             at java.net.PlainSocketImpl.accept(Unknown Source)
             - locked <0x231be8b8> (a java.net.SocksSocketImpl)
             at java.net.ServerSocket.implAccept(Unknown Source)
             at java.net.ServerSocket.accept(Unknown Source)
             at org.jboss.remoting.transport.socket.SocketServerInvoker.run(SocketServerInvoker.java:445)
             at java.lang.Thread.run(Unknown Source)
            
            "Low Memory Detector" daemon prio=6 tid=0x00a6f208 nid=0x59c runnable [0x00000000..0x00000000]
            
            "CompilerThread0" daemon prio=10 tid=0x00a6ddd0 nid=0x164 waiting on condition [0x00000000..0x02bcf6cc]
            
            "Signal Dispatcher" daemon prio=10 tid=0x00a6d120 nid=0x278 waiting on condition [0x00000000..0x00000000]
            
            "Finalizer" daemon prio=8 tid=0x00a67f78 nid=0x49c in Object.wait() [0x02b4f000..0x02b4fa68]
             at java.lang.Object.wait(Native Method)
             - waiting on <0x23068d88> (a java.lang.ref.ReferenceQueue$Lock)
             at java.lang.ref.ReferenceQueue.remove(Unknown Source)
             - locked <0x23068d88> (a java.lang.ref.ReferenceQueue$Lock)
             at java.lang.ref.ReferenceQueue.remove(Unknown Source)
             at java.lang.ref.Finalizer$FinalizerThread.run(Unknown Source)
            
            "Reference Handler" daemon prio=10 tid=0x00a67aa0 nid=0x650 in Object.wait() [0x02b0f000..0x02b0fae8]
             at java.lang.Object.wait(Native Method)
             - waiting on <0x23068e08> (a java.lang.ref.Reference$Lock)
             at java.lang.Object.wait(Unknown Source)
             at java.lang.ref.Reference$ReferenceHandler.run(Unknown Source)
             - locked <0x23068e08> (a java.lang.ref.Reference$Lock)
            
            "VM Thread" prio=10 tid=0x00a65a60 nid=0x13c runnable
            
            "VM Periodic Task Thread" prio=10 tid=0x00a70420 nid=0x960 waiting on condition
            


            • 3. Re: Closing Consumer Causes Hang
              clebert.suconic

              You are calling .close inside MessageListener...

              And when you call .close, it will be waiting the Messages being processed.

              But as you are inside a messageListener, you are in a dead lock. MessageListener still processing while close is waiting messages being finished.

              at org.jboss.jms.client.remoting.MessageCallbackHandler.waitForOnMessageToComplete(MessageCallbackH
              andler.java:357)
               at org.jboss.jms.client.remoting.MessageCallbackHandler.close(MessageCallbackHandler.java:312)
              


              • 4. Re: Closing Consumer Causes Hang
                toddjtidwell

                That's what I figured. So now I have the following questions:

                1) The destination is empty so it's only the message that prompted the close() that is being processed. Why would close() block waiting for that final message to process, why not just close down.

                2) This used to work under JBoss Messaging and every other JMS server I've ever worked with.

                3) If this is the way it's going to be going forward, how would I accomplish this same behavior? Am I going to have to create a separate thread that watches the state of my consumers and checks some state property to see if the "close" message has been received. That adds a lot of code to what once was a fairly simple thing.


                • 5. Re: Closing Consumer Causes Hang
                  timfox

                  This looks like a bug to me.

                  Clebert seems to have identified the problem - I think a fix should be fairly straightforward.

                  We just nee to add a check in the code, something like this:

                  if (currentThread != messageListenerThread)
                  {
                   waitForOnMessageToComplete();
                  }
                  else
                  {
                  //don't wait
                  }
                  


                  BTW Todd in your code, there's no need to stop the connection, then close the consumer, session and connection, just doing a connection.close() will automatically stop and shut the session and consumers.

                  This doesn't effect the validity of the bug though.

                  Also if you are going to immediately acknowledge the message using message.acknowledge() you may as well use auto acknowledge.

                  • 6. Re: Closing Consumer Causes Hang
                    toddjtidwell

                    Tim,

                    Thanks! YOu're right, under normal circumstances, I would just use auto-acknowlege if I was going to immediatly acknowlege the message.

                    However, the test code that I used in my sample was also the same test code I was using to test some message expiration and redelivery behavior, and the acknowledge() call is still in there.

                    As for calling connectio.stop(), that's very good to know. I'll definitly take that out. Up until you set me straight, I was always unsure quite where in the progression of the logic to put it.

                    I look forward to the patch for this bug. Thank you all again for doing such a fantastic job and for being so responsive to the needs and questions of your users.

                    • 7. Re: Closing Consumer Causes Hang
                      ovidiu.feodorov

                      I've created a new bug report: http://jira.jboss.org/jira/browse/JBMESSAGING-542 and added a test that currently fails (MiscellaneousTest.testClosingConnectionFromMessageListener())

                      The solution you propose has a drawback, though. If we allow the listener to close the connection within onMessage(), then postDeliver() won't be able to pass by session's ClosedInterceptor, which now is "closed", so we'll get an ERROR "Failed to deliver message" in the logs. It's harmless, but ugly.

                      Little bit more thought required :)