6 Replies Latest reply on Oct 22, 2010 12:32 AM by rajks

    how to - session creation flags and auto commit acks

    rajks Newbie

      Hi,

       

      Having problems with how to use the acknowledge(), auto commit ack. Any help appreciated.

       

      I have a  combined the hornetq server plus consumer in one JVM process and I use Netty connector instead of in VM connector so that I can later on do clustering. I am using the CORE / native apis only.

       

      The producer is a separate jvm process to test producing messages

       

      I have only one queue and ONE dedicated consumer thread to read messages from this queue and process the message in POOL of worker threads.

       

       

      The test is consumer will acknowledge alternate messages only.

      And upon shutdown and restart I should get only the unacknowledged messages only.

       

      But this is not working. I get all the messages even the ones acknowledged. I saw in another discussion similar topic and I tried to follow that using the ack batch size to 0. Still no luck.

       

      The code snippets

       

       

          session = sessionFactory.createSession(false, false, 0);

       

       

          Consumer thread:

       

               for(int i = 0;i < 10,000; i++) {

       

                  msg = clientConsumer.receive();

                   ....

       

                   boolean ack = ( i % 2 ) == 0

       

                   workerPool.execute(session, msg, ack);

               }

       

       

          Worker thread:

       

                   print got msg ....

                   if (ack) {

                      msg.acknowledge()

                      session.commit();

                   }

       

       

      I tried to play around by adding the session.rollback() on the alternate unack messages - that also does not help.

                   if (ack) {

                      msg.acknowledge()

                      session.commit();

                   } else {

                      session.rollback();

                   }

       

      This is also resulting in receiving all the messages queued earlier upon restart.

       

      I badly need to get this proto type working to be able to proceed further to use the HQ.

       

      -Raj

        • 1. Re: how to - session creation flags and auto commit acks
          Clebert Suconic Master

          I - A session is supposed to be a single theaded object. you're not supposed to do the acknowledgement or commit in another thread.

           

          II - Since you created the session with auto-commit-ack=false, you're supposed to call commit at some point.

          1 of 1 people found this helpful
          • 2. Re: how to - session creation flags and auto commit acks
            rajks Newbie

            Thanks for the help

             

            I tried to acknowledge in the consumer thread itself and it works it seems to be working.

             

            But I started getting this exception

             

            Oct 21, 2010 6:43:24 PM org.hornetq.core.logging.impl.JULLogDelegate error
            SEVERE: Caught unexpected exception
            java.lang.IllegalStateException: 1593275665 Could not find reference on consumerID=0, messageId = 25 queue = queue.debug closed = false
                at org.hornetq.core.server.impl.ServerConsumerImpl.acknowledge(ServerConsumerImpl.java:512)
                at org.hornetq.core.server.impl.ServerSessionImpl.acknowledge(ServerSessionImpl.java:513)
                at org.hornetq.core.protocol.core.ServerSessionPacketHandler.handlePacket(ServerSessionPacketHandler.java:290)
                at org.hornetq.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:471)
                at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:451)
                at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:412)
                at org.hornetq.core.remoting.server.impl.RemotingServiceImpl$DelegatingBufferHandler.bufferReceived(RemotingServiceImpl.java:459)
                at org.hornetq.core.remoting.impl.netty.HornetQChannelHandler.messageReceived(HornetQChannelHandler.java:67)
                at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:100)
                at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:362)
                at org.jboss.netty.channel.StaticChannelPipeline$StaticChannelHandlerContext.sendUpstream(StaticChannelPipeline.java:514)
                at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:287)
                at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.decode(HornetQFrameDecoder2.java:169)
                at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.messageReceived(HornetQFrameDecoder2.java:134)
                at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
                at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:362)
                at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:357)
                at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
                at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
                at org.jboss.netty.channel.socket.oio.OioWorker.run(OioWorker.java:90)
                at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
                at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)
                at org.jboss.netty.util.VirtualExecutorService$ChildExecutorRunnable.run(VirtualExecutorService.java:181)
                at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
                at java.lang.Thread.run(Thread.java:662)
            Oct 21, 2010 6:43:24 PM org.hornetq.core.logging.impl.JULLogDelegate error

             

            Is that any configuration settings error or doing something wrong.

             

            Thanks

            Raj

            • 4. Re: how to - session creation flags and auto commit acks
              rajks Newbie

              I verified that I am not acknowleding more than once.

               

              The only thin I am doing is receiving the messages and I acknowledge ODD ID messages and keep the even messages in memory to similate a retry condition internally.

              Once retried in memory for 3 times I acknowledge all the EVEN ID messages in the same session thread.

               

              This is when I get the exception I posted earlier. I have not restarted the process at all.

               

              What should be done correctly here ??

              • 5. Re: how to - session creation flags and auto commit acks
                Clebert Suconic Master

                Whenver you ACK the last message, you're acknowledging all the previous messages...

                 

                So, you are double acking indeed.

                 

                say, you received:

                 

                msg1,

                 

                msg2,

                 

                msg3

                 

                 

                when you do msg3.ack...

                 

                you are acking 1 and 2 automatically.

                 

                 

                This is the semantic implemented. It's similar to JMS semantics on ACKing.

                1 of 1 people found this helpful
                • 6. Re: how to - session creation flags and auto commit acks
                  rajks Newbie

                  Thanks for getting me to this point.

                   

                  Is there any other option available for my use case ??

                   

                  If not  do I have to do the ack management in our app layer and batch process it ???