11 Replies Latest reply on Jun 23, 2011 11:45 AM by S ks

    Exception in hornetq server when consumer comes back

    S ks Newbie

      When iam killing my consumer using "kill -9 pid" , after that  iam sending some messages to Hornetq server , those messages are stored as Paging is on. Our queues are durable.

       

      When consumer is brought back on . All the messages which were sent earlier to hornetq server reach the Consumer , But on the Hornetq server logs i see below mentioned exception.  The version we are using is 2.2.2.Final.

       

      ConnectionTTL is very high in our case , it is 24 hours (This is done as we have a compelling usecase on the consumer side , we cannot afford connection time outs)  We are using Hornetq core apis to for producers and consumers.

       

      Can i know why this exception happens.

       

      Could this be happening : When consumer comes back , and as old connection or session  is still alive , does Hornetq server tries to look for those messages in old session ?

       

      SEVERE: Caught unexpected exception

      java.lang.IllegalStateException: 821045869 Could not find reference on consumerID=0, messageId = 2147483726 queue = billableConsumerQueue closed = false

          at org.hornetq.core.server.impl.ServerConsumerImpl.acknowledge(ServerConsumerImpl.java:559)

          at org.hornetq.core.server.impl.ServerSessionImpl.acknowledge(ServerSessionImpl.java:553)

          at org.hornetq.core.protocol.core.ServerSessionPacketHandler.handlePacket(ServerSessionPacketHandler.java:268)

          at org.hornetq.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:474)

          at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:496)

          at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:457)

          at org.hornetq.core.remoting.server.impl.RemotingServiceImpl$DelegatingBufferHandler.bufferReceived(RemotingServiceImpl.java:459)

          at org.hornetq.core.remoting.impl.netty.HornetQChannelHandler.messageReceived(HornetQChannelHandler.java:73)

          at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:100)

          at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:362)

          at org.jboss.netty.channel.StaticChannelPipeline$StaticChannelHandlerContext.sendUpstream(StaticChannelPipeline.java:514)

          at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:287)

          at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.decode(HornetQFrameDecoder2.java:169)

          at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.messageReceived(HornetQFrameDecoder2.java:134)

          at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)

          at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:362)

          at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:357)

          at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)

          at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)

          at org.jboss.netty.channel.socket.oio.OioWorker.run(OioWorker.java:90)

          at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)

          at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)

          at org.jboss.netty.util.VirtualExecutorService$ChildExecutorRunnable.run(VirtualExecutorService.java:181)

          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

          at java.lang.Thread.run(Thread.java:636)

        • 1. Re: Exception in hornetq server when consumer comes back
          Andy Taylor Master

          When iam killing my consumer using "kill -9 pid" , after that  iam sending some messages to Hornetq server , those messages are stored as Paging is on. Our queues are durable.

          How do you know paging is kicking in, or do you mean that the messages are just persisted.

           

          When consumer is brought back on . All the messages which were sent earlier to hornetq server reach the Consumer , But on the Hornetq server logs i see below mentioned exception.  The version we are using is 2.2.2.Final.

          so, was your consumer not acknowledging the messages the first time?

           

          Could this be happening : When consumer comes back , and as old connection or session  is still alive , does Hornetq server tries to look for those messages in old session ?

          can you explain what you mean by brought back up, are you just restarting a client app.

           

          I really need more info before i can help you

          • 2. Re: Exception in hornetq server when consumer comes back
            S ks Newbie

            Q. How do you know paging is kicking in, or do you mean that the messages are just persisted.

            Answer : Sorry for the confusion , Yes you are right , i mean messages are persisted.Our queue is durable queue.

             

             

            Q . so, was your consumer not acknowledging the messages the first time?

            Answer: Yes , i mean , we actually get lots of messages to the client , then clients forwards it to the external system via webservices, this external system is slower than Hornetq so we queue them internally in a java queue. So we ONLY acknowledge the message once i we get the confirmation from the exteranl system that it has recieved the data. When we killed our consumer client , at that point some messages  were in the java queue and those were  not acknowledged . When we restarted the consumer client we got back those messages. Which is what i expected too , but in addition to that i also got the above mentioned exceptions.

             

            Q .can you explain what you mean by brought back up, are you just restarting a client app.

            A. Yes we just restarted the client app.

             

            If you want the code snippet which got executed again i can send that too.

             

            The above is just the small snippet of the code which executes everytime the consumer client is brought back up

             

                serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
               

            sf = serverLocator.createSessionFactory();

               

            try {

                        this.session = sf.createSession(true,true,1);

                        try {

                        this.session.createQueue(HornetQContext.JMS_TOPIC_BILLABLE_AGGREGATED_EVENTS, HornetQContext.BILLABLE_CONSUMER_QUEUE,true);

                        }catch(HornetQException hqe) {

                           //We do some stuff

                        }

                        this.consumer = session.createConsumer(HornetQContext.BILLABLE_CONSUMER_QUEUE);

                        session.start();

                    } catch (HornetQException e) {

                        e.printStackTrace();

                       logger.error(" Error while generating session ",e);

                    }

            • 3. Re: Exception in hornetq server when consumer comes back
              Andy Taylor Master

              where and how do you ack the message?

               

              also remember that altho you have a high ttl that the resources will be cleared up if the server detects the client failing, which it will in this case.

              • 4. Re: Exception in hornetq server when consumer comes back
                S ks Newbie

                I ack the message in seperate thread .

                I ack the message by calling message.acknowledge().

                 

                We have a single thread which keep recieving the message , once the message is recieved it is placed on a java queue . There are 5 executor threads which pick up the message from this queue .Then from the message we extract the information we want , then pass this info to the external service via webservice. Once the external service acks that it has recived the information , we acknowledge the message.

                 

                We essentially do

                message.acknowledge(); This is in seperate thread though than the thread which created the session.

                • 5. Re: Exception in hornetq server when consumer comes back
                  Andy Taylor Master

                  is your java queue persistent, and do you remove it from the queue when after it has been acked, what may be happening is that you are killing the client at the point it acks the message

                  • 6. Re: Exception in hornetq server when consumer comes back
                    Leos Bitto Novice

                    rahul singh wrote:

                     

                    IWe have a single thread which keep recieving the message , once the message is recieved it is placed on a java queue . There are 5 executor threads which pick up the message from this queue

                     

                    I believe that if you would get rid of that java queue, and received the messages from the HornetQ queue directly by your 5 threads (acknowledging them to HornetQ after you have processed them), your application would be much simpler. Having more consumers processing the messages from the same queue is a standard pattern which works well. I don't see the reason for complicating the message processing with one more step (your java queue).

                    • 7. Re: Exception in hornetq server when consumer comes back
                      Andy Taylor Master

                      I believe that if you would get rid of that java queue, and received the messages from the HornetQ queue directly by your 5 threads (acknowledging them to HornetQ after you have processed them), your application would be much simpler. Having more consumers processing the messages from the same queue is a standard pattern which works well. I don't see the reason for complicating the message processing with one more step (your java queue).

                      +1

                      • 8. Re: Exception in hornetq server when consumer comes back
                        S ks Newbie

                        Thank you for the inputs. I would indeed do that .

                        But coming back to the issue above , I do make sure that messages are deleted from the queue , Infact i actually do take , which essentially deletes the head of the queue. Any idea why the above exception might be happening?

                         

                        Regarding your above suggestion,  if I understand right this is what you are suggesting.

                         

                        Is have a class which does this

                            serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
                            ClientSessionFactory sf = serverLocator.createSessionFactory();

                         

                        And all individual threads  use the above class to get the ClientSessionFactory instance and the do below thingy.

                        sf.createSession(true,true,1);

                                consumer = session.createConsumer(QUEUENAME);
                                session.start();

                         

                        Right ? So each thread will have a own session and consumer . As a session is unique to a Consumer right? And also can i create 50 of them?

                        • 9. Re: Exception in hornetq server when consumer comes back
                          S ks Newbie

                          Also i have a question ,

                           

                          If i have 5 consumers listening to the same queue , would nt all of them get the same messages ?

                          If i have message M1 , if 5 threads have consumers and they listen to the same Topic or the same Queue , would nt they will get the same M1 ?

                           

                          What i want is essentially , if there are MEssagess M1, M2 ,M3 ,M4 ,M5 , thread1 should get M1, Thread2 M2, THread3 M3 , thread4 M4 , thread5 M5. If this can be done , Can this be done??

                          • 10. Re: Exception in hornetq server when consumer comes back
                            Leos Bitto Novice

                            rahul singh wrote:

                             

                            If i have 5 consumers listening to the same queue , would nt all of them get the same messages ?

                            If i have message M1 , if 5 threads have consumers and they listen to the same Topic or the same Queue , would nt they will get the same M1 ?

                             

                            What i want is essentially , if there are MEssagess M1, M2 ,M3 ,M4 ,M5 , thread1 should get M1, Thread2 M2, THread3 M3 , thread4 M4 , thread5 M5. If this can be done , Can this be done??

                             

                            Well, you are demonstrating that you are missing basic knowledge of messaging. So I would suggest to focus on studying the principles of JMS (not that HornetQ proprietary core API would be wrong, but there is much more information about JMS available) and you will see. What you are specifically looking for is the difference between queue and topic.

                             

                            Regarding your programming, you might consider using some of the existing JMS containers, which would enable you to implement just one interface (javax.jms.MessageListener) with one method (onMessage) and do not care about the rest (threading, reconnecting in case of any failures, etc.). Nice containers are for example SimpleMessageListenerContainer or DefaultMessageListenerContainer from Spring, but you might prefer some others instead.

                            • 11. Re: Exception in hornetq server when consumer comes back
                              S ks Newbie

                              Well  indeed Mr Leos Bitto iam new to JMS . And my bad , because of certain compulsions didnt get a chance to read the document completly. Instead of making remarks it would have been great if you would have  point me to certain document . Anyways i have read it now.

                               

                              Anyways  Any idea why the above exception happened , given the fact that i actually deleted the mesages from the Java queue so resending of messages is not happening.I mean duplicate acknowledgement is not happening. Any idea why the above could happen?