6 Replies Latest reply on Jul 11, 2009 4:05 AM by jamesc

    Durable consumer already connected on consumer restart ?

    jamesc

      Hi, this problem is seen on FUSE 5.3.0.0/RHEL4/java-1.5.0-sun-1.5.0.15.

       

       

      We have a persistent problem with durable STOMP consumers.  From time to time we need to restart the consumers. This is usually after we've had some problems in the  broker showing up in the logs. On restarting the durable consumer we get a stack trace :

       

      2009-07-01 11:07:37,749 WARN  TransportConnection            - Failed to add Connection

      javax.jms.InvalidClientIDException: Broker: cern-prod-gridmsg101.cern.ch - Client: 3b290372-d363-4dda-8771-1086d52ff06e already connected from /128.142.183.2

      2:51026

              at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:209)

              at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:82)

              at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:82)

              at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:77)

              at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:82)

              at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:89)

              at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:89)

              at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:686)

              at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:86)

              at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:134)

              at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)

              at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)

              at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)

              at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:78)

              at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:135)

              at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:491)

              at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:187)

              at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:67)

              at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)

              at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)

              at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)

              at java.lang.Thread.run(Thread.java:595)

       

      It seems the broker still thinks the old connection is connected and won?t let us reconnect.  We?d like to know how to get the broker to time out the old connection, if it is connected. Currently our only solution here is to clean out completely the data files and start the broker from afresh which isn?t so good for us.

        • 1. Re: Durable consumer already connected on consumer restart ?
          lvisnick

          Jamesc,

             I am working on a test case - will update the thread when I have test case results.

           

          Lorinda

          • 2. Re: Durable consumer already connected on consumer restart ?
            jamesc

            Hi Lorinda,

             

            I have some debug info from another crash we had last night and I got some debug information if it helps.

             

            The scenario is a STOMP durable consumer reading off a topic with ~40K messages per hour being sent to the topic.  I create a new durable consumer and after reading ~ 17000 messages from the topic we see the following on the client side:

             

            java.lang.RuntimeException: java.lang.IllegalStateException: Message id ID:gridmsg101.cern.ch-51802-1246440493711-2:10528:1:1:1 could not be recovered from the data store - already dispatched

             

            java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: Message id ID:gridmsg101.cern.ch-51802-1246440493711-2:10528:1:1:1 could not be recovered from the data store - already dispatched

                    at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:104)

                    at org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.reset(StoreDurableSubscriberCursor.java:225)

                    at org.apache.activemq.broker.region.PrefetchSubscription.dispatchPending(PrefetchSubscription.java:560)

                    at org.apache.activemq.broker.region.DurableTopicSubscription.activate(DurableTopicSubscription.java:130)

                    at org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:105)

                    at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:376)

                    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)

                    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)

                    at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:85)

                    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)

                    at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)

                    at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:546)

                    at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)

                    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)

                    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)

                    at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)

                    at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:78)

                    at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:135)

                    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSubscribe(ProtocolConverter.java:414)

                    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:183)

                    at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:67)

                    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)

                    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)

                    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)

                    at java.lang.Thread.run(Thread.java:595)

            Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: Message id ID:gridmsg101.cern.ch-51802-1246440493711-2:10528:1:1:1 could not be recovered from the data store - already dispatched

                    at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:239)

                    at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:101)

                    ... 24 more

            Caused by: java.lang.IllegalStateException: Message id ID:gridmsg101.cern.ch-51802-1246440493711-2:10528:1:1:1 could not be recovered from the data store - already dispatched

                    at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:58)

                    at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)

                    at org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.recoverNextMessages(KahaTopicReferenceStore.java:287)

                    at org.apache.activemq.store.amq.AMQTopicMessageStore.recoverNextMessages(AMQTopicMessageStore.java:59)

                    at org.apache.activemq.broker.region.cursors.TopicStorePrefetch.doFillBatch(TopicStorePrefetch.java:91)

                    at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:236)

                    ... 25 more

             

             

            Then we get the durable consumer already connected when the consumer restarts:

             

            message: Durable consumer is in use for client: SWAT Importer - prod and subscriptionName: EGEE WN

            javax.jms.JMSException: Durable consumer is in use for client: SWAT Importer - prod and subscriptionName: EGEE WN

                    at org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:75)

                    at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:376)

                    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)        at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.j

            ava:86)        at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:85)

                    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)        at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(Mutable

            BrokerFilter.java:93)        at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:546)

                    at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)

                    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)

                    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)

                    at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)        at org.apache.activemq.transport.stomp.StompTransportFilter.se

            ndToActiveMQ(StompTransportFilter.java:78)

                    at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:135)

                    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSubscribe(ProtocolConverter.java:414)

                    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:183)        at org.apache.activemq.transport.stomp.StompTr

            ansportFilter.onCommand(StompTransportFilter.java:67)

                    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)

                    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.

            java:185)

                    at java.lang.Thread.run(Thread.java:595)

             

             

            In the web console we see that original consumer is still 'connected'  -e.g consumer count rises to 2. Also after this point any consumer that connects to the topic gets no messages or errors,  and the server logs fills up with the error :

             

            2009-07-03 09:57:24,514 ERROR AbstractStoreCursor            - Failed to fill batch

            java.lang.RuntimeException: java.lang.IllegalStateException: Message id ID:gridmsg101.cern.ch-51802-1246440493711-2:10528:1:1:1 could not be recovered from the d

            ata store - already dispatched

                    at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:239)

                    at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:101)

                    at org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.reset(StoreDurableSubscriberCursor.java:225)

                    at org.apache.activemq.broker.region.PrefetchSubscription.dispatchPending(PrefetchSubscription.java:560)

                    at org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:154)

                    at org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:197)

                    at org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:49)

                    at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:568)

                    at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:426)

                    at org.apache.activemq.broker.region.Topic.send(Topic.java:364)

                    at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:354)

                    at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:438)

                    at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:224)

                    at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)

                    at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)

                    at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)

                    at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)

                    at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)

                    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)

                    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)

                    at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)

                    at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:78)

                    at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:135)

                    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:247)

                    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:173)

                    at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:67)

                    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)

                    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)

                    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)

                    at java.lang.Thread.run(Thread.java:595)

            Caused by: java.lang.IllegalStateException: Message id ID:gridmsg101.cern.ch-51802-1246440493711-2:10528:1:1:1 could not be recovered from the data store - alrea

            dy dispatched

                    at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:58)

                    at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)

                    at org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.recoverNextMessages(KahaTopicReferenceStore.java:287)

                    at org.apache.activemq.store.amq.AMQTopicMessageStore.recoverNextMessages(AMQTopicMessageStore.java:59)

                    at org.apache.activemq.broker.region.cursors.TopicStorePrefetch.doFillBatch(TopicStorePrefetch.java:91)

                    at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:236)

                    ... 29 more

             

             

            cheers,

             

            James.

            • 3. Re: Durable consumer already connected on consumer restart ?
              lvisnick

              James,

                I have been away on long weekend - which problem do you think you would like me to try to focus on?  the original exception that causes your consumer to die?  (not sure I will be able to reproduce this one - do you know if your consumers always die with that bad cursor?)  The reconnect error?  All subsequent connect failures?

               

              I suspect once I can reproduce the reconnect error, I will see also the subsequent connect/no messages problem too - but as of now, I don't have a test case yet showing the problem.  I continue to work on my test case.

               

              Lorinda

              • 4. Re: Durable consumer already connected on consumer restart ?
                jamesc

                Hi Lorinda,

                 

                I agree the initial exception will be a bit harder to track down - it' very non-deterministic for us - sometimes we can consume millions of messages without problems.  While it's bad that this happens and should try and get to the bottom of it, what really hurts us is that we can't reconnect the consumer and need to wipe out the store before continuing.

                 

                So it's probably best to track down this reconnect problem first and we can work on finding a more reproducable test case for the initial cursor exception.  As for your question on the initial exception on the cursor - yes, this exact exception we see regularly in the logs.

                 

                thanks for your help.

                 

                James.

                • 5. Re: Durable consumer already connected on consumer restart ?
                  lvisnick

                  James,

                     I was finally able to complete a test case that shows your reported problemm iff running 5.3.0.0.

                   

                  If I download and use the latest 5.3 release: 5.3.0.3 the problem does not show itself.

                   

                  Restated: a durable stomp client can be disconnected (via a ctl-c) and then reconnect with no problem when using 5.3.0.3.  I did not bother trying the other releases between 5.3 and 5.3.0.3.

                   

                  (For what it's worth, this problem did not exist in 5.2.0.2 - that is the only other release that I tested).

                   

                  So, if you upgrade to the latest release you should be all set.

                  Thank you,

                  Lorinda

                  • 6. Re: Durable consumer already connected on consumer restart ?
                    jamesc

                    Lorinda,

                     

                    thanks for your work on this case.  We have our test brokers on 5.3.0.3 already, so should be able to roll forward the production ones quickly to solve this problem.

                     

                    James.