    how to - session creation flags and auto commit acks

    rajks



      Having problems with how to use the acknowledge(), auto commit ack. Any help appreciated.


      I have a  combined the hornetq server plus consumer in one JVM process and I use Netty connector instead of in VM connector so that I can later on do clustering. I am using the CORE / native apis only.


      The producer is a separate jvm process to test producing messages


      I have only one queue and ONE dedicated consumer thread to read messages from this queue and process the message in POOL of worker threads.



      The test is consumer will acknowledge alternate messages only.

      And upon shutdown and restart I should get only the unacknowledged messages only.


      But this is not working. I get all the messages even the ones acknowledged. I saw in another discussion similar topic and I tried to follow that using the ack batch size to 0. Still no luck.


      The code snippets



          session = sessionFactory.createSession(false, false, 0);



          Consumer thread:


               for(int i = 0;i < 10,000; i++) {


                  msg = clientConsumer.receive();



                   boolean ack = ( i % 2 ) == 0


                   workerPool.execute(session, msg, ack);




          Worker thread:


                   print got msg ....

                   if (ack) {






      I tried to play around by adding the session.rollback() on the alternate unack messages - that also does not help.

                   if (ack) {



                   } else {




      This is also resulting in receiving all the messages queued earlier upon restart.


      I badly need to get this proto type working to be able to proceed further to use the HQ.



        1. Re: how to - session creation flags and auto commit acks
          Clebert Suconic

          I - A session is supposed to be a single theaded object. you're not supposed to do the acknowledgement or commit in another thread.


          II - Since you created the session with auto-commit-ack=false, you're supposed to call commit at some point.

          2. Re: how to - session creation flags and auto commit acks
            rajks

            Thanks for the help


            I tried to acknowledge in the consumer thread itself and it works it seems to be working.


            But I started getting this exception


            Oct 21, 2010 6:43:24 PM org.hornetq.core.logging.impl.JULLogDelegate error
            SEVERE: Caught unexpected exception
            java.lang.IllegalStateException: 1593275665 Could not find reference on consumerID=0, messageId = 25 queue = queue.debug closed = false
                at org.hornetq.core.server.impl.ServerConsumerImpl.acknowledge(ServerConsumerImpl.java:512)
                at org.hornetq.core.server.impl.ServerSessionImpl.acknowledge(ServerSessionImpl.java:513)
                at org.hornetq.core.protocol.core.ServerSessionPacketHandler.handlePacket(ServerSessionPacketHandler.java:290)
                at org.hornetq.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:471)
                at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:451)
                at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:412)
                at org.hornetq.core.remoting.server.impl.RemotingServiceImpl$DelegatingBufferHandler.bufferReceived(RemotingServiceImpl.java:459)
                at org.hornetq.core.remoting.impl.netty.HornetQChannelHandler.messageReceived(HornetQChannelHandler.java:67)
                at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:100)
                at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:362)
                at org.jboss.netty.channel.StaticChannelPipeline$StaticChannelHandlerContext.sendUpstream(StaticChannelPipeline.java:514)
                at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:287)
                at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.decode(HornetQFrameDecoder2.java:169)
                at org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2.messageReceived(HornetQFrameDecoder2.java:134)
                at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
                at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:362)
                at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:357)
                at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
                at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
                at org.jboss.netty.channel.socket.oio.OioWorker.run(OioWorker.java:90)
                at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
                at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)
                at org.jboss.netty.util.VirtualExecutorService$ChildExecutorRunnable.run(VirtualExecutorService.java:181)
                at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
                at java.lang.Thread.run(Thread.java:662)
            Oct 21, 2010 6:43:24 PM org.hornetq.core.logging.impl.JULLogDelegate error


            Is that any configuration settings error or doing something wrong.




            4. Re: how to - session creation flags and auto commit acks
              rajks

              I verified that I am not acknowleding more than once.


              The only thin I am doing is receiving the messages and I acknowledge ODD ID messages and keep the even messages in memory to similate a retry condition internally.

              Once retried in memory for 3 times I acknowledge all the EVEN ID messages in the same session thread.


              This is when I get the exception I posted earlier. I have not restarted the process at all.


              What should be done correctly here ??

              5. Re: how to - session creation flags and auto commit acks
                Clebert Suconic

                Whenver you ACK the last message, you're acknowledging all the previous messages...


                So, you are double acking indeed.


                say, you received:









                when you do msg3.ack...


                you are acking 1 and 2 automatically.



                This is the semantic implemented. It's similar to JMS semantics on ACKing.

                6. Re: how to - session creation flags and auto commit acks
                  rajks

                  Thanks for getting me to this point.


                  Is there any other option available for my use case ??


                  If not  do I have to do the ack management in our app layer and batch process it ???