4 Replies Latest reply on Oct 10, 2013 4:05 PM by clebert.suconic

    HornetQ Clustering - removing consumer which did not handle a message

    safetytrick

      I am getting this message: "removing consumer which did not handle a message" every so often in my cluster. When this occurs the subscribing node no longer receives messages for that topic. (The complete stack trace is below)

       

      The issue seems to be caused by either of:

       

      java.util.NoSuchElementException

          at org.hornetq.utils.PriorityLinkedListImpl$PriorityLinkedListIterator.repeat(PriorityLinkedListImpl.java:189)

       

      java.lang.IndexOutOfBoundsException

          at org.jboss.netty.buffer.AbstractChannelBuffer.setIndex(AbstractChannelBuffer.java:67)

          at org.hornetq.core.buffers.impl.ChannelBufferWrapper.setIndex(ChannelBufferWrapper.java:497)

       

      This only occurs on one of my topics, the other topic doesn't seem to hit this bug? The size of messages on this topic varies more than the size of messages on the other, it could potentially hold larger messages. Could I be hitting a message size limit? Why would this only affect some of the connections even though all are getting the message?

       

       

      06:33:31,715 WARN  [org.hornetq.core.server.impl.QueueImpl] (Thread-7 (HornetQ-server-HornetQServerImpl::serverUUID=1c4eee4d-b2a8-11e1-8e13-00163e1f4f4a-1647531163)) removing consumer which did not handle a message, consumer=ClusterConnectionBridge@6a2bf192 [name=sf.default-cluster-connection.dccaa12a-bfdd-11e1-87f5-00163e7fe440, queue=QueueImpl[name=sf.default-cluster-connection.dccaa12a-bfdd-11e1-87f5-00163e7fe440, postOffice=PostOfficeImpl [server=HornetQServerImpl::serverUUID=1c4eee4d-b2a8-11e1-8e13-00163e1f4f4a]]@7c97cb70 targetConnector=ServerLocatorImpl (identity=(Cluster-connection-bridge::ClusterConnectionBridge@6a2bf192 [name=sf.default-cluster-connection.dccaa12a-bfdd-11e1-87f5-00163e7fe440, queue=QueueImpl[name=sf.default-cluster-connection.dccaa12a-bfdd-11e1-87f5-00163e7fe440, postOffice=PostOfficeImpl [server=HornetQServerImpl::serverUUID=1c4eee4d-b2a8-11e1-8e13-00163e1f4f4a]]@7c97cb70 targetConnector=ServerLocatorImpl [initialConnectors=[org-hornetq-core-remoting-impl-netty-NettyConnectorFactory?port=5445&host=app04-cl01-va-us-server-com], discoveryGroupConfiguration=null]]::ClusterConnectionImpl@1190647206[nodeUUID=1c4eee4d-b2a8-11e1-8e13-00163e1f4f4a, connector=org-hornetq-core-remoting-impl-netty-NettyConnectorFactory?port=5445&host=app01-cl01-server-com, address=jms, server=HornetQServerImpl::serverUUID=1c4eee4d-b2a8-11e1-8e13-00163e1f4f4a])) [initialConnectors=[org-hornetq-core-remoting-impl-netty-NettyConnectorFactory?port=5445&host=app04-cl01-va-us-server-com], discoveryGroupConfiguration=null]], message=Reference[2148752586]:RELIABLE:ServerMessage[messageID=2148752586,priority=4, bodySize=12000,expiration=0, durable=true, address=jms.topic.topic/toplinkSynchronization,properties=TypedProperties[{_HQ_ROUTE_TOsf.default-cluster-connection.dccaa12a-bfdd-11e1-87f5-00163e7fe440=[B@680503f, _HQ_ROUTE_TOsf.default-cluster-connection.1c7cb4c6-b2a8-11e1-8c72-00163e348852=[B@52145b6e, _HQ_ROUTE_TOsf.default-cluster-connection.1c27414b-b2a8-11e1-abb5-00163e025c2a=[B@367ebcdb, _HQ_ROUTE_TOsf.default-cluster-connection.1c2e945e-b2a8-11e1-b77a-00163e37e209=[B@3aa0e9d0, _HQ_ROUTE_TOsf.default-cluster-connection.1c151906-b2a8-11e1-b5af-00163e2bb142=[B@1a5674c2}]]@571177488: java.lang.IndexOutOfBoundsException

          at org.jboss.netty.buffer.AbstractChannelBuffer.setIndex(AbstractChannelBuffer.java:67)

          at org.hornetq.core.buffers.impl.ChannelBufferWrapper.setIndex(ChannelBufferWrapper.java:497)

          at org.hornetq.core.message.impl.MessageImpl.<init>(MessageImpl.java:182)

          at org.hornetq.core.message.impl.MessageImpl.<init>(MessageImpl.java:146)

          at org.hornetq.core.server.impl.ServerMessageImpl.<init>(ServerMessageImpl.java:90)

          at org.hornetq.core.server.impl.ServerMessageImpl.copy(ServerMessageImpl.java:206)

          at org.hornetq.core.server.cluster.impl.ClusterConnectionBridge.beforeForward(ClusterConnectionBridge.java:184)

          at org.hornetq.core.server.cluster.impl.BridgeImpl.handle(BridgeImpl.java:548)

          at org.hornetq.core.server.impl.QueueImpl.handle(QueueImpl.java:2195)

          at org.hornetq.core.server.impl.QueueImpl.deliver(QueueImpl.java:1746)

          at org.hornetq.core.server.impl.QueueImpl.doPoll(QueueImpl.java:1625)

          at org.hornetq.core.server.impl.QueueImpl.access$1300(QueueImpl.java:77)

          at org.hornetq.core.server.impl.QueueImpl$ConcurrentPoller.run(QueueImpl.java:2482)

          at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:100)

          at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) [rt.jar:1.6.0_30]

          at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [rt.jar:1.6.0_30]

          at java.lang.Thread.run(Unknown Source) [rt.jar:1.6.0_30]

       

      06:33:31,715 ERROR [org.hornetq.utils.OrderedExecutorFactory] (Thread-7 (HornetQ-server-HornetQServerImpl::serverUUID=1c4eee4d-b2a8-11e1-8e13-00163e1f4f4a-1647531163)) Caught unexpected Throwable: java.util.NoSuchElementException

          at org.hornetq.utils.PriorityLinkedListImpl$PriorityLinkedListIterator.repeat(PriorityLinkedListImpl.java:189)

          at org.hornetq.core.server.impl.QueueImpl.deliver(QueueImpl.java:1763)

          at org.hornetq.core.server.impl.QueueImpl.doPoll(QueueImpl.java:1625)

          at org.hornetq.core.server.impl.QueueImpl.access$1300(QueueImpl.java:77)

          at org.hornetq.core.server.impl.QueueImpl$ConcurrentPoller.run(QueueImpl.java:2482)

          at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:100)

          at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) [rt.jar:1.6.0_30]

          at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [rt.jar:1.6.0_30]

          at java.lang.Thread.run(Unknown Source) [rt.jar:1.6.0_30]

        • 1. Re: HornetQ Clustering - removing consumer which did not handle a message
          jbertram

          Couple of questions...

          1. What version of AS7 are you using?
          2. What is your cluster topology?
          3. How many subscribers do you have and where are they connected?
          • 2. Re: HornetQ Clustering - removing consumer which did not handle a message
            safetytrick

            AS 7.1.1 running in standalone mode, with 6 nodes with the configuration below:

             

                 <subsystem xmlns="urn:jboss:domain:messaging:1.1">

                        <hornetq-server>

                            <clustered>true</clustered>

                            <persistence-enabled>true</persistence-enabled>

                            <security-enabled>false</security-enabled>

                            <jmx-management-enabled>true</jmx-management-enabled>

                            <message-counter-enabled>true</message-counter-enabled>

                            <shared-store>false</shared-store>

                            <journal-type>ASYNCIO</journal-type>

                            <journal-file-size>10485760</journal-file-size>

                            <journal-min-files>2</journal-min-files>

             

                            <connectors>

                                <netty-connector name="netty" socket-binding="messaging"/>

                                <netty-connector name="netty-throughput" socket-binding="messaging-throughput">

                                    <param key="batch-delay" value="50"/>

                                </netty-connector>

                                <in-vm-connector name="in-vm" server-id="0"/>

                            </connectors>

             

                            <acceptors>

                                <netty-acceptor name="netty" socket-binding="messaging"/>

                                <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">

                                    <param key="batch-delay" value="50"/>

                                    <param key="direct-deliver" value="false"/>

                                </netty-acceptor>

                                <in-vm-acceptor name="in-vm" server-id="0"/>

                            </acceptors>

             

                            <broadcast-groups>

                                <broadcast-group name="default-broadcast-group">

                                    <group-address>231.7.7.7</group-address>

                                    <group-port>9876</group-port>

                                    <broadcast-period>5000</broadcast-period>

                                    <connector-ref>

                                        netty

                                    </connector-ref>

                                </broadcast-group>

                            </broadcast-groups>

             

                            <discovery-groups>

                                <discovery-group name="default-discovery-group">

                                    <socket-binding>messaging-cluster</socket-binding>

                                    <refresh-timeout>20000</refresh-timeout>

                                </discovery-group>

                            </discovery-groups>

             

                            <cluster-connections>

                                <cluster-connection name="default-cluster-connection">

                                    <address>jms</address>

                                    <connector-ref>netty</connector-ref>

                                    <retry-interval>500</retry-interval>

                                    <forward-when-no-consumers>true</forward-when-no-consumers>

                                    <max-hops>1</max-hops>

                                    <discovery-group-ref discovery-group-name="default-discovery-group"/>

                                </cluster-connection>

                            </cluster-connections>

             

                            <security-settings>

                                <security-setting match="#">

                                    <permission type="send" roles="guest"/>

                                    <permission type="consume" roles="guest"/>

                                    <permission type="createNonDurableQueue" roles="guest"/>

                                    <permission type="deleteNonDurableQueue" roles="guest"/>

                                </security-setting>

                            </security-settings>

             

                            <address-settings>

                                <address-setting match="#">

                                    <dead-letter-address>jms.queue.DLQ</dead-letter-address>

                                    <expiry-address>jms.queue.ExpiryQueue</expiry-address>

                                    <redelivery-delay>1000</redelivery-delay>

                                    <max-size-bytes>104857600</max-size-bytes>

                                    <page-size-bytes>10485760</page-size-bytes>

                                    <address-full-policy>PAGE</address-full-policy>

                                    <message-counter-history-day-limit>10</message-counter-history-day-limit>

                                </address-setting>

                            </address-settings>

             

                            <jms-connection-factories>

                                <connection-factory name="InVmConnectionFactory">

                                    <connectors>

                                        <connector-ref connector-name="in-vm"/>

                                    </connectors>

                                    <entries>

                                        <entry name="java:/ConnectionFactory"/>

                                    </entries>

                                </connection-factory>

                                <connection-factory name="RemoteConnectionFactory">

                                    <connectors>

                                        <connector-ref connector-name="netty"/>

                                    </connectors>

                                    <entries>

                                        <entry name="RemoteConnectionFactory"/>

                                        <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>

                                    </entries>

                                </connection-factory>

                                <pooled-connection-factory name="hornetq-ra">

                                    <transaction mode="xa"/>

                                    <connectors>

                                        <connector-ref connector-name="in-vm"/>

                                    </connectors>

                                    <entries>

                                        <entry name="java:/JmsXA"/>

                                    </entries>

                                </pooled-connection-factory>

                            </jms-connection-factories>

             

                            <jms-destinations>

                                <jms-queue name="testQueue">

                                    <entry name="queue/test"/>

                                    <entry name="java:jboss/exported/jms/queue/test"/>

                                    <durable>false</durable>

                                </jms-queue>

                                <jms-queue name="queue/backgroundJobQueue">

                                    <entry name="java:jboss/exported/jms/queue/backgroundJobQueue"/>

                                    <durable>true</durable>

                                </jms-queue>

                                <jms-queue name="queue/applicationEvent">

                                    <entry name="java:jboss/exported/jms/queue/applicationEvent"/>

                                    <durable>true</durable>

                                </jms-queue>

                                <jms-topic name="testTopic">

                                    <entry name="topic/test"/>

                                    <entry name="java:jboss/exported/jms/topic/test"/>

                                </jms-topic>

                                <jms-topic name="topic/cacheSynchronization">

                                    <entry name="java:jboss/exported/jms/topic/cacheSynchronization"/>

                                </jms-topic>

                                <jms-topic name="topic/toplinkSynchronization">

                                    <entry name="java:jboss/exported/jms/topic/toplinkSynchronization"/>

                                </jms-topic>

                            </jms-destinations>

                        </hornetq-server>

                    </subsystem>

            • 3. Re: HornetQ Clustering - removing consumer which did not handle a message
              safetytrick

              As a workaround we patched MessageImpl's copy constructor and retried the buffer copy (after a small Thread.sleep()) if an IndexOutOfBoundsException was caught. This seems to have resolved the problem, when the issue does occur the copy succeeds on the first retry (we retry 3 times)

               

              These are our changes:

               

                 /*

                  * Copy constructor

                  */

                 protected MessageImpl(final MessageImpl other, TypedProperties properties)

                 {

                    messageID = other.getMessageID();

                    userID = other.getUserID();

                    address = other.getAddress();

                    type = other.getType();

                    durable = other.isDurable();

                    expiration = other.getExpiration();

                    timestamp = other.getTimestamp();

                    priority = other.getPriority();

                    this.properties = new TypedProperties(properties);

               

                    // This MUST be synchronized using the monitor on the other message to prevent it running concurrently

                    // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to

                    // many subscriptions and bridging to other nodes in a cluster

                    synchronized (other)

                    {

                       bufferValid = other.bufferValid;

                       endOfBodyPosition = other.endOfBodyPosition;

                       endOfMessagePosition = other.endOfMessagePosition;

                       copied = other.copied;

               

                       if (other.buffer != null)

                       {

                           // there is a strange race condition here, if we fail retry an additional 3 times

                           IndexOutOfBoundsException copyFailureException = copyBuffer(other);

                           int i = 0;

                           while(copyFailureException != null && i++ < 3) {

                               log.warn("Retrying message copy: " + i);

                               try {

                                   Thread.sleep(200);

                               } catch (InterruptedException e) { }

                               copyFailureException = copyBuffer(other);

                           }

                           if(copyFailureException != null) {

                               log.fatal("Message copy failed after 3 attempts, message will be discarded.");

                               throw copyFailureException;

                           }

                       }

                    }

                 }

               

                  private IndexOutOfBoundsException copyBuffer(MessageImpl other) {

                      try {

                          createBody(other.buffer.capacity());

               

                          // We need to copy the underlying buffer too, since the different messsages thereafter might have different

                          // properties set on them, making their encoding different

                          buffer = other.buffer.copy(0, other.buffer.capacity());

                          buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());

                          return null;

                      } catch (IndexOutOfBoundsException e) {

                          log.fatal(String.format("other capacity=%s reader=%s writer=%s", other.buffer.capacity(), other.buffer.readerIndex(), other.buffer.writerIndex()));

                          if(buffer != null) {

                              log.fatal(String.format("new buffer capacity=%s reader=%s writer=%s", buffer.capacity(), buffer.readerIndex(), buffer.writerIndex()));

                          }

                          log.fatal(String.format("message id=%s durable=%s type=%s",other.getMessageID(), other.isDurable(), other.buffer.toString()));

               

                          return e;

                      }

                  }

               

              This is definitely not a perfect long term fix.

               

              Our logging shows these numbers when we run into the bug:

               

              other capacity=12000 reader=0 writer=6007

              new buffer capacity=6000 reader=0 writer=6000

              message id=3367752 durable=true type=org.hornetq.core.buffers.impl.ChannelBufferWrapper@75c005b

              • 4. Re: HornetQ Clustering - removing consumer which did not handle a message
                clebert.suconic

                just a reference for future reference (from googlers)

                 

                 

                This was an issue and it was fixed here: https://github.com/hornetq/hornetq/pull/1328

                 

                 

                it was a really rare race that we now replicated it and fixed.