1 Reply Latest reply on May 18, 2011 3:45 PM by Clebert Suconic

    Validation of hornetq usage

    Todd West Newbie



      We are running hornetq 2.2.2 and running into some very strange issues which I hope are because of invalid usage on our part and not something more serious. I was hoping that someone who knows how it should be used could verify that our usage is valid and if not, how we might improve it. Here is the basic setup:


      • We have an endpoint that accepts items and enqueues them into Queue1
      • On startup of the app we spin up 8 threads that all read from Queue1, do some work on the item and then ack the message
        • We have an object pool that contains 8 custom objects (made up of a session, producer and consumer object)
        • When each thread is initialized it creates a new session (autocommit sends & acks = true) and then creates the consumer and producer and throws it in the pool
        • Whenever a thread needs to consume something from the queue or write something to another queue it grabs an object from the pool, uses it and puts it back
        • Each worker has an onMessage method to handle incoming items and do work on them


      So I guess my question is, does this seem valid to you? We are seeing messages get consumed at a pretty slow rate (like 10-20/sec max) and after what seems like a random period of time that it's working it will just break and start throwing out messages like:


      java.lang.IllegalStateException: Cannot send a packet while channel is doing failover

              at org.hornetq.core.protocol.core.impl.ChannelImpl.send(ChannelImpl.java:184)

              at org.hornetq.core.protocol.core.impl.ChannelImpl.sendBatched(ChannelImpl.java:147)

              at org.hornetq.core.client.impl.ClientProducerImpl.doSend(ClientProducerImpl.java:291)

              at org.hornetq.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:135)




      WARNING: Error on checkDLQ

      java.lang.IllegalStateException: Cannot find add info 6618056

              at org.hornetq.core.journal.impl.JournalImpl.appendUpdateRecord(JournalImpl.java:908)

              at org.hornetq.core.persistence.impl.journal.JournalStorageManager.updateScheduledDeliveryTime(JournalStorageManager.java:551)

              at org.hornetq.core.server.impl.QueueImpl.checkRedelivery(QueueImpl.java:1614)

              at org.hornetq.core.server.impl.QueueImpl$RefsOperation.afterRollback(QueueImpl.java:1995)

              at org.hornetq.core.transaction.impl.TransactionImpl.afterRollback(TransactionImpl.java:469)

              at org.hornetq.core.transaction.impl.TransactionImpl$3.done(TransactionImpl.java:329)

              at org.hornetq.core.persistence.impl.journal.OperationContextImpl.executeOnCompletion(OperationContextImpl.java:188)

              at org.hornetq.core.persistence.impl.journal.JournalStorageManager.afterCompleteOperations(JournalStorageManager.java:429)

              at org.hornetq.core.transaction.impl.TransactionImpl.rollback(TransactionImpl.java:317)

              at org.hornetq.core.server.impl.ServerConsumerImpl.close(ServerConsumerImpl.java:343)

              at org.hornetq.core.server.impl.ServerSessionImpl.closeConsumer(ServerSessionImpl.java:963)

              at org.hornetq.core.protocol.core.ServerSessionPacketHandler.handlePacket(ServerSessionPacketHandler.java:430)

              at org.hornetq.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:474)

              at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:496)

              at org.hornetq.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:457)

              at org.hornetq.core.remoting.server.impl.RemotingServiceImpl$DelegatingBufferHandler.bufferReceived(RemotingServiceImpl.java:459)

              at org.hornetq.core.remoting.impl.invm.InVMConnection$1.run(InVMConnection.java:137)




      We also have some code that is checking multiple queue depths using this code:

      public long getQueueDepth(String queueName) {

              ClientSession session = null;

              try {

                  session = nonBlockingSessionFactory.createSession(false, false, false);


                  return session.queueQuery(new SimpleString(queueName)).getMessageCount();


              catch (Exception e) {

                  // no-op


              finally {

                  if (session != null) {

                      try {



                      catch (HornetQException e) {

                          return -1;




              return -1;




      I have seen some strange behavior where this won't return quickly (and sometimes won't return at all) and I'm wondering if this is somehow causing the queues to get in a weird state where they stop processing messages (which we have also seen as well).



      Any information or guidance you can give would be fantastic. Please let me know if you need any clarification as well. Thanks!