3 Replies Latest reply on May 9, 2013 4:30 PM by maybenow

    message queueing up on producer side and none on the consumer queue

    maybenow

      we have a simple case, use hornetq 2.2.14 to send message to multipl subcriber.

       

       

      we have the seesion created as the suggestion http://stackoverflow.com/questions/6452505/hornetq-messages-still-remaining-in-queue-after-consuming-using-core-api

      or https://community.jboss.org/thread/168449

       

      our case is quite similar to the above example.

       

      when we using jconsole to monitory the consumer queue, we see the message count and delivering count are all 0, after fire over 10k message

       

      but on producer queue, the message count is the same as we message we send total. and around that time the send is blocked

       

      the jstack info is as following

       

      "http-0.0.0.0-8680-20" daemon prio=10 tid=0x00007f62bc022800 nid=0xf8d6 waiting on condition [0x00007f625bf7c000]

         java.lang.Thread.State: WAITING (parking)

              at sun.misc.Unsafe.park(Native Method)

              - parking to wait for  <0x00000006157b00a0> (a java.util.concurrent.Semaphore$NonfairSync)

              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)

              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)

              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)

              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)

              at java.util.concurrent.Semaphore.acquire(Semaphore.java:441)

              at org.hornetq.core.client.impl.ClientProducerCreditsImpl.acquireCredits(ClientProducerCreditsImpl.java:81)

              at org.hornetq.core.client.impl.ClientProducerImpl.doSend(ClientProducerImpl.java:305)

              at org.hornetq.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:135)

              at com.apple.iad.ims.server.replconnector.ReplConnectorHornetQImpl.send(ReplConnectorHornetQImpl.java:150)

              at com.apple.iad.ims.server.api.DeleteCampaign.deleteCampaignImpl(DeleteCampaign.java:61)

              at com.apple.iad.ims.server.api.DeleteCampaign.deleteCampaign(DeleteCampaign.java:44)

              at sun.reflect.GeneratedMethodAccessor137.invoke(Unknown Source)

              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

              at java.lang.reflect.Method.invoke(Method.java:597)

              at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)

              at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205)

              at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)

              at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)

              at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)

              at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)

              at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)

              at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1480)

              at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1411)

              at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1360)

              at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1350)

              at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:416)

              at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:538)

       

      we have the message.ack call after received the message and reuse the session, producer and consumer as suggested in the document.

       

      any idea, why this is caused? the code and configuration is posted below

       

       

      Thanks a lot

        • 1. Re: message queueing up on producer side and none on the consumer queue
          maybenow

          try to make the case more clear.

           

          we are running our application on 4 jboss svr, one as producer and other 3 as consumer, and try to replicate the message from producer to consumer,

           

          here is the code we are using for create session and messagehandler

           

          public class ReplConnectorHornetQImpl implements ReplConnector {

             

              private static final Logger LOG = Logger.getLogger(ReplConnectorHornetQImpl.class);

              private static final MessageHandler msgHandler = new MessageHandlerImpl();

              private static final int RECONNECT_ATTEMPTS = 600;

              private static final long RETRY_INTERVAL = 100l;

              private static final int BUFFER_SIZE = 10240;

              private ReplServiceConfig replConfig;

              private ClientSession session;

              private ClientProducer producer;

              private ClientConsumer consumer;

           

              private static class LazyReplConnectorHolder

              {

                  private static final ReplConnectorHornetQImpl instance = new ReplConnectorHornetQImpl();

              }

           

              private ReplConnectorHornetQImpl() {}

           

           

              public static ReplConnectorHornetQImpl getInstance()

              {

                  return LazyReplConnectorHolder.instance;

              }

           

              @Override

              public void init() throws Exception

              {

           

                  final SimpleString queueName = new SimpleString("queuename" + "-" + System.getProperty("java.rmi.server.hostname") + "-" + System.getProperty("jboss.server.name"));

                  LOG.info("init: queueName = " + queueName.toString());

                 

                  Map connectionParams = new HashMap();

                  connectionParams.put("host", IP_ADDRESS);

                  connectionParams.put("port", PORT));

                  TransportConfiguration config = new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory", connectionParams);

                  ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration[] {

                          config

                  });

                  locator.setReconnectAttempts(RECONNECT_ATTEMPTS);

                  locator.setRetryInterval(RETRY_INTERVAL);

                 

                  String role = (config.isWriter()) ? "writer" : "reader";

                  LOG.debug("Connecting to " + locator.toString() + " as " + role);

                 

                  try {

                  ClientSessionFactory factory = locator.createSessionFactory();

                  session = factory.createSession(true, true,0);

                 

                 

                  if(!session.queueQuery(queueName).isExists())

                  {

                      session.createQueue("topicname", "queuename", false);

                      LOG.info((new StringBuilder("Initialized queue because queue '")).append(queueName).append("' has not been setup in the server").toString());

                  }     

                 

                  if(config.isWriter()) {

                      producer = session.createProducer(replConfig.getTopicName());

                  } else {

                      consumer = session.createConsumer(queueName);

                      consumer.setMessageHandler(msgHandler);

                  }

                  session.start();

                  } catch (Exception e) {

                      HornetQFailureBeanImpl.getInstance().setStateToFalse();

                      throw new Exception(e);

                  }

              }

           

              public void cleanup()

              {

                  try

                  {

                      session.close();

                      producer.close();

                      consumer.close();

                  }

                  catch(Exception e)

                  {

                      LOG.error((new StringBuilder("cleanup: ")).append(e).toString());

                  }

              }

           

              public boolean send(Object toSend)

              {

                  if(!replConfig.isEnable())

                      return true;

                 

                  // If this is not a writer config, this should not have been called

                  if(!replConfig.isWriter())

                  {

                      LOG.error("send: called for a non-writer configuration");

                      return false;

                  }

                 

                  // Create json string

                  ObjectMapper mapper = new ObjectMapper();

                  String jsonStr;

                  try {

                      jsonStr = mapper.writeValueAsString(toSend);

                  } catch (Exception e) {

           

                      LOG.error("send: caught exception " + e);

                      return false;

                  }

                 

                  LOG.debug("send: called to send " + jsonStr);

           

                  ClientMessage message = session.createMessage(false);

                  message.getBodyBuffer().writeString(jsonStr);

                  try

                  {

                      producer.send(message);

                      LOG.debug((new StringBuilder("send: producer sent message ")).append(message.toString()).toString());

           

                  }

                  catch(HornetQException e)

                  {

           

                      LOG.error((new StringBuilder("send ")).append(e).toString());

                      return false;

                  }

                 

                  if (producer.isClosed()) {

                      // check if the session is closed after  reconnect

                      HornetQFailureBeanImpl.getInstance().setStateToFalse();

                      HornetQFailureBeanImpl.getInstance().incrementCount();

                      LOG.error((new StringBuilder("send fail: ")).append(message).toString());

                      return false;

                  }

                  return true;

              }

           

          }

           

           

          public class MessageHandlerImpl implements MessageHandler {

              private static final Logger LOG = Logger.getLogger(MessageHandlerImpl.class);

             

              @Override

              public void onMessage(ClientMessage message) {

                  String jsonStr = message.getBodyBuffer().readString();

                  LOG.debug("received message: " + jsonStr);

                 

                  // Map to ReplWrapper object

                  ObjectMapper mapper = new ObjectMapper();

                  JsonNode jsonObj = null;

                  try {            

                      jsonObj = mapper.readValue(jsonStr, JsonNode.class);

                      @SuppressWarnings("rawtypes")

                      Class c = Class.forName(jsonObj.get("method").getTextValue()); // Reserve line repl callback

                 

                      LOG.debug("onMessage: callback method on " + c.getName());

                     

                      ReplCallback callback = (ReplCallback)(c.newInstance());

                      callback.onMessage(jsonObj);

                      message.acknowledge();

                     

                  } catch (JsonParseException e) {

                      LOG.error("onMessage: could not parse " + jsonStr + e);

                      return;

                  } catch (JsonMappingException e) {

                      LOG.error("onMessage: could not map " + jsonStr + e);

                      return;

                  } catch (IOException e) {

                      LOG.error("onMessage: error receiving " + jsonStr + e);

                      return;

                  } catch (ClassNotFoundException e) {

                      LOG.error("onMessage: error receiving " + jsonStr + e);

                      return;

                  } catch (Exception e) {

                      LOG.error("onMessage: error processing " + jsonStr + e);

                      return;

                  }

              }

           

          }

           

           

          the config for hornetq is listed as following

          <configuration xmlns="urn:hornetq"

                         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                         xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

           

             <paging-directory>${data.dir:../data}/paging</paging-directory>

           

             <bindings-directory>${data.dir:../data}/bindings</bindings-directory>

           

             <journal-directory>${data.dir:../data}/journal</journal-directory>

           

             <journal-min-files>10</journal-min-files>

           

             <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>

           

             <connectors>

                <connector name="netty">

                   <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

                   <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

                   <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>

                </connector>

           

                <connector name="netty-throughput">

                   <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

                   <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

                   <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>

                   <param key="batch-delay" value="50"/>

                </connector>

             </connectors>

           

             <acceptors>

                <acceptor name="netty">

                   <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

                   <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

                   <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>

                </acceptor>

           

           

                <acceptor name="netty-throughput">

                   <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

                   <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

                   <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>

                   <param key="batch-delay" value="50"/>

                   <param key="direct-deliver" value="false"/>

                </acceptor>

             </acceptors>

           

             <security-enabled>false</security-enabled>

             <security-settings>

                <security-setting match="#">

                   <permission type="createNonDurableQueue" roles="guest"/>

                   <permission type="deleteNonDurableQueue" roles="guest"/>

                   <permission type="consume" roles="guest"/>

                   <permission type="send" roles="guest"/>

                </security-setting>

             </security-settings>

           

            <address-settings>

                <!--default for catch all-->

                <address-setting match="#">

                   <dead-letter-address>jms.queue.DLQ</dead-letter-address>

                   <expiry-address>jms.queue.ExpiryQueue</expiry-address>

                   <redelivery-delay>0</redelivery-delay>

                   <max-size-bytes>10485760</max-size-bytes>

                   <message-counter-history-day-limit>10</message-counter-history-day-limit>

                   <address-full-policy>BLOCK</address-full-policy>

                </address-setting>

             </address-settings>

           

          </configuration>

          • 2. Re: message queueing up on producer side and none on the consumer queue
            clebert.suconic

            You have it set to block, so it will block if you don't consume messages.

            • 3. Re: message queueing up on producer side and none on the consumer queue
              maybenow

              sorry, pretty new to hornetq, not sure what do you mean by "consume".

               

              the message is send to consumer and consumed by the MessageHandlerImpl which I listed above, after that we did a   message.acknowledge();

               

              all the consumer queue message count is 0 after 10k message delivered and the "messaged added" is equal to producer producing message, so

              I thought all the messages are consumed already.

               

              is not this the full cycle. or there is any step to "consume" the message?