1 Reply Latest reply on Aug 22, 2012 9:09 PM by Joel Pearson

    AS 7.1.1 HornetQ Reaper Thread not Reaping

    Joel Pearson Newbie

      Hi,

       

      I originally posted this on the JBoss AS 7 forum https://community.jboss.org/thread/204241?tstart=30 but since it's more related to HornetQ, I thought I'd post it here as well.

       

      I'm using AS 7.1.1 with stock HornetQ (2.2.13.Final), and I'm having problems expiring messages.

       

      I'm using standalone-full.xml and this is the JMS section of the file, with my "queue/ControlQueue" JMS queue:

              <subsystem xmlns="urn:jboss:domain:messaging:1.1">
                  <hornetq-server>
                      <persistence-enabled>false</persistence-enabled>
                      <security-enabled>false</security-enabled>
                      <message-counter-enabled>true</message-counter-enabled>
                      <message-expiry-scan-period>2000</message-expiry-scan-period>
                      <message-expiry-thread-priority>9</message-expiry-thread-priority>
                      <journal-type>NIO</journal-type>
                      <journal-file-size>102400</journal-file-size>
                      <journal-min-files>2</journal-min-files>
      
      
                      <connectors>
                          <netty-connector name="netty" socket-binding="messaging"/>
                          <netty-connector name="netty-throughput" socket-binding="messaging-throughput">
                              <param key="batch-delay" value="50"/>
                          </netty-connector>
                          <in-vm-connector name="in-vm" server-id="0"/>
                      </connectors>
      
      
                      <acceptors>
                          <netty-acceptor name="netty" socket-binding="messaging"/>
                          <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">
                              <param key="batch-delay" value="50"/>
                              <param key="direct-deliver" value="false"/>
                          </netty-acceptor>
                          <in-vm-acceptor name="in-vm" server-id="0"/>
                      </acceptors>
      
      
                      <address-settings>
                          <address-setting match="#">
                              <redelivery-delay>0</redelivery-delay>
                              <max-delivery-attempts>1</max-delivery-attempts>
                              <max-size-bytes>10485760</max-size-bytes>
                              <address-full-policy>DROP</address-full-policy>
                              <message-counter-history-day-limit>1</message-counter-history-day-limit>
                          </address-setting>
                      </address-settings>
      
      
                      <jms-connection-factories>
                          <connection-factory name="InVmConnectionFactory">
                              <connectors>
                                  <connector-ref connector-name="in-vm"/>
                              </connectors>
                              <entries>
                                  <entry name="java:/ConnectionFactory"/>
                              </entries>
                          </connection-factory>
                          <connection-factory name="RemoteConnectionFactory">
                              <connectors>
                                  <connector-ref connector-name="netty"/>
                              </connectors>
                              <entries>
                                  <entry name="RemoteConnectionFactory"/>
                                  <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
                              </entries>
                          </connection-factory>
                          <pooled-connection-factory name="hornetq-ra">
                              <transaction mode="xa"/>
                              <connectors>
                                  <connector-ref connector-name="in-vm"/>
                              </connectors>
                              <entries>
                                  <entry name="java:/JmsXA"/>
                              </entries>
                          </pooled-connection-factory>
                      </jms-connection-factories>
      
      
                      <jms-destinations>
                          <jms-queue name="ControlQueue">
                              <entry name="queue/ControlQueue"/>
                              <durable>false</durable>
                          </jms-queue>
                      </jms-destinations>
                  </hornetq-server>
              </subsystem>
      
      

       

      The relevant pool snippet:

       

      <strict-max-pool name="control-mdb-strict-max-pool" max-pool-size="1" instance-acquisition-timeout="2" instance-acquisition-timeout-unit="MINUTES"/>
      

       

      My JMS sender looks like this:

       

         @Resource(mappedName = "java:/ConnectionFactory")
         private ConnectionFactory connectionFactory;
      
      
         @Resource(mappedName = "java:/queue/ControlQueue")
         private Queue controlQueue;
      
      
         @Schedule(second = "*/1", minute = "*", hour = "*", persistent = false)
         public void testControlQueue() {
            try {
               Connection connection = this.connectionFactory.createConnection();
      
      
               try {
                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                  MessageProducer sender = session.createProducer(this.controlQueue);
                  sender.setTimeToLive(2000);
      
                  TextMessage message = session.createTextMessage("Hi! " + new Date());
                  sender.send(message);
               }
               finally {
                  // Clean up any resources.
                  if (connection != null) {
                     connection.close();
                  }
               }
            }
            catch (JMSException jmse) {
               // TODO Auto-generated catch block
               Loggers.business.error("JMS send failed", jmse);
            }
         }
      
      

       

      And my MDB looks like this:

       

      @Pool(value = "control-mdb-strict-max-pool")
      @MessageDriven(activationConfig = {
            @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
            @ActivationConfigProperty(propertyName = "destination", propertyValue = "java:/queue/ControlQueue") })
      public class ControlMessageDrivenBean implements MessageListener {
      
      
         @Override
         public void onMessage(Message message) {
            if (message instanceof TextMessage) {
               TextMessage textMessage = (TextMessage) message;
               try {
                  Loggers.business.info("Received message: [" + textMessage.getText() + "]");
                  Thread.sleep(5000);
               }
               catch (JMSException jmse) {
                  // Log an error rather than throw an exception. There is no point trying to re-process this message (which
                  // will happen if an exception is thrown).
                  Loggers.business.error("Could not retrieve text from message [" + textMessage + "]", jmse);
               }
               catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
               }
            }
      
      
         }
      
      
      }
      
      

       

      I downloaded the hornetQ source and put a break point on the queue and when the reaper runs it seems to think my queue is empty. But I can see from the logging output, that it is definately queuing up my messages, because it processes each message one by one, and the date increases by 1 second, instead of expiring the messages that were queued up while the MDB was sleeping.

       

      Inside "org.hornetq.core.server.impl.QueueImpl.expireReferences()" the iterator always appears to be empty.  Is there some limitation where queue using an "in-vm" queue doesn't support expiration?