0 Replies Latest reply on Aug 31, 2011 9:48 PM by Adam Malter

    Stalled Queue Consumer on Windows(NIO) with Large Message and ConsumerWindow

    Adam Malter Newbie

      Hi All,

       

      We use HornetQ as our development JMS provider, and I'm trying to get everyone comfortable enough to upgrade it to production. Having some troubles however when we really start to bang on it.

       

      This is the case:

       

      HornetQ running in our old customized JBoss instance (I won't even mention the version number, needless to say we've re-written enough stuff that I worry my case is not generic/easily reproducible)

       

      Anyway - this is what I'm seeing..

       

      Server config is:

       

      {code}

               Configuration config = new ConfigurationImpl();

       

               config.setPersistenceEnabled(true);

               config.setJournalDirectory(FileUtil.buildPath(rootData, "journal").getPath());

               config.setBindingsDirectory(FileUtil.buildPath(rootData, "bindings").getPath());

               config.setPagingDirectory(FileUtil.buildPath(rootData, "paging").getPath());

               config.setLargeMessagesDirectory(FileUtil.buildPath(rootData, "lobs").getPath());

               config.setPersistDeliveryCountBeforeDelivery(true); //required for redelivery count to work on rollbacks

       

               if (SystemUtils.IS_OS_WINDOWS) {

                  config.setJournalType(JournalType.NIO);

               }

       

               //HornetQ requires cluster user and pw or spits out warnings

               config.setClustered(false);

               config.setClusterUser(user);

               config.setClusterPassword(password);

               config.setSecurityEnabled(false);

       

               //Simple Q Service for now - only in VM transport

               config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));

               config.setCreateJournalDir(true);

               config.setCreateBindingsDir(true);

       

      {code}

       

      Contents of hornetq-version.properties from our jar file:

       

      {code}

      hornetq.version.versionName=HQ_2_2_5_FINAL_AS7

      hornetq.version.majorVersion=2

      hornetq.version.minorVersion=2

      hornetq.version.microVersion=5

      hornetq.version.incrementingVersion=121

      hornetq.version.versionSuffix=Final

      hornetq.version.versionTag=Final

      hornetq.netty.version=3.2.3.Final-r${buildNumber}

      hornetq.version.compatibleVersionList=121

      {code}

       

      We're running in XA mode with durable queues and no selectors or funny priorities. Additionally, the issue only occurs when I leave the consumer window size at default. If I change setConsumerWindowSize(0), my issue disappears.

       

      So inside a single transaction, I send about 10 ~512kb to the same destination. Destination receives them okay, starts to process them, and then triggers an application level error (In this case I'm purposefully triggering it to test our resubmit handling)

       

      Our application logic is that we try to process the message a pre-set number of times (lets say 3) and before moving it to dead letter. This usually happens on the very first rollback, but not always - essentially the consumer/mdb picks up a message, rolls it back, starts new tx, tries to pick up another one and then hangs with the following stack:

       

       

      {code}

      "AuditMDB[000]" - Thread t@113

         java.lang.Thread.State: TIMED_WAITING

                at java.lang.Object.wait(Native Method)

                - waiting on <1a647ea> (a org.hornetq.core.client.impl.LargeMessageControllerImpl)

                at org.hornetq.core.client.impl.LargeMessageControllerImpl.waitCompletion(LargeMessageControllerImpl.java:304)

                at org.hornetq.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:283)

                at org.hornetq.core.client.impl.ClientLargeMessageImpl.checkBuffer(ClientLargeMessageImpl.java:201)

                at org.hornetq.core.client.impl.ClientLargeMessageImpl.getBodyBuffer(ClientLargeMessageImpl.java:101)

                at org.hornetq.jms.client.HornetQBytesMessage.getBuffer(HornetQBytesMessage.java:451)

                at org.hornetq.jms.client.HornetQBytesMessage.readBytes(HornetQBytesMessage.java:248)

                at com.tradecard.server.util.BytesMessageInputStream.read(BytesMessageInputStream.java:68)

                at com.tradecard.framework.util.ChunkedByteBuffer.append(ChunkedByteBuffer.java:344)

                at com.tradecard.framework.util.ChunkedByteBuffer.append(ChunkedByteBuffer.java:332)

                at com.tradecard.server.mdb.HornetQContainerInvoker$HornetQServerSession.run(HornetQContainerInvoker.java:287)

                at java.lang.Thread.run(Thread.java:662)

       

       

         Locked ownable synchronizers:

                - None

       

      {code}

       

      The somewhat relevant portions of our Hornet Invoker are:

       

      {code}

                        xaResource = ((XAQueueSession)session).getXAResource();

                        txManager.begin();

                        tx = txManager.getTransaction();

                        tx.enlistResource(xaResource);

       

                        pooledSession.syncRegister();

                        tx.registerSynchronization(pooledSession);

                        //cache the session/senders for QueueSender to use

                        TransactionCache.put(TransactionCache.MQXA_SESSION, pooledSession);

                        txid = TransactionToolbox.getGlobalTransactionId();

       

                        Message requestMessage = null;

                        requestMessage = receiver.receive(2000);

       

                        if (requestMessage != null && !quit) {

                           Message message = null;

                           try {

                              BytesMessage bytesMessage = (BytesMessage)requestMessage;

                              BytesMessageInputStream stream = new BytesMessageInputStream(bytesMessage);

                              ChunkedByteBuffer buffer = new ChunkedByteBuffer(10000, 100);

                              buffer.append(stream);   // <--- hanging here

      ....

      {code}

       

      There doesn't seem to be anyone owning the lock. Eventually I get timeout warnings from my TxManager and have to bounce the server. As soon as I bounce, it picks up another message and processes, quickly getting the error again.

       

      Additionally, I can see a number of files pending in my lobs directory.

       

      All of our custom infrastructure makes it difficult to extract the case into a postable test case. However, if anyone thinks it worthwhile and this doesn't seem like anything straight forward, I'll make the attempt.

       

      The idea for setting the consumer window size to zero came from the following closed bug - https://issues.jboss.org/browse/HORNETQ-111

       

      Thanks for your time and any help!