1 2 Previous Next 18 Replies Latest reply on Aug 28, 2012 10:47 AM by clebert.suconic

    AS 7.1.1 HornetQ Reaper Thread not Reaping

    japearson

      Hi,

       

      I originally posted this on the JBoss AS 7 forum https://community.jboss.org/thread/204241?tstart=30 but since it's more related to HornetQ, I thought I'd post it here as well.

       

      I'm using AS 7.1.1 with stock HornetQ (2.2.13.Final), and I'm having problems expiring messages.

       

      I'm using standalone-full.xml and this is the JMS section of the file, with my "queue/ControlQueue" JMS queue:

              <subsystem xmlns="urn:jboss:domain:messaging:1.1">
                  <hornetq-server>
                      <persistence-enabled>false</persistence-enabled>
                      <security-enabled>false</security-enabled>
                      <message-counter-enabled>true</message-counter-enabled>
                      <message-expiry-scan-period>2000</message-expiry-scan-period>
                      <message-expiry-thread-priority>9</message-expiry-thread-priority>
                      <journal-type>NIO</journal-type>
                      <journal-file-size>102400</journal-file-size>
                      <journal-min-files>2</journal-min-files>
      
      
                      <connectors>
                          <netty-connector name="netty" socket-binding="messaging"/>
                          <netty-connector name="netty-throughput" socket-binding="messaging-throughput">
                              <param key="batch-delay" value="50"/>
                          </netty-connector>
                          <in-vm-connector name="in-vm" server-id="0"/>
                      </connectors>
      
      
                      <acceptors>
                          <netty-acceptor name="netty" socket-binding="messaging"/>
                          <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">
                              <param key="batch-delay" value="50"/>
                              <param key="direct-deliver" value="false"/>
                          </netty-acceptor>
                          <in-vm-acceptor name="in-vm" server-id="0"/>
                      </acceptors>
      
      
                      <address-settings>
                          <address-setting match="#">
                              <redelivery-delay>0</redelivery-delay>
                              <max-delivery-attempts>1</max-delivery-attempts>
                              <max-size-bytes>10485760</max-size-bytes>
                              <address-full-policy>DROP</address-full-policy>
                              <message-counter-history-day-limit>1</message-counter-history-day-limit>
                          </address-setting>
                      </address-settings>
      
      
                      <jms-connection-factories>
                          <connection-factory name="InVmConnectionFactory">
                              <connectors>
                                  <connector-ref connector-name="in-vm"/>
                              </connectors>
                              <entries>
                                  <entry name="java:/ConnectionFactory"/>
                              </entries>
                          </connection-factory>
                          <connection-factory name="RemoteConnectionFactory">
                              <connectors>
                                  <connector-ref connector-name="netty"/>
                              </connectors>
                              <entries>
                                  <entry name="RemoteConnectionFactory"/>
                                  <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
                              </entries>
                          </connection-factory>
                          <pooled-connection-factory name="hornetq-ra">
                              <transaction mode="xa"/>
                              <connectors>
                                  <connector-ref connector-name="in-vm"/>
                              </connectors>
                              <entries>
                                  <entry name="java:/JmsXA"/>
                              </entries>
                          </pooled-connection-factory>
                      </jms-connection-factories>
      
      
                      <jms-destinations>
                          <jms-queue name="ControlQueue">
                              <entry name="queue/ControlQueue"/>
                              <durable>false</durable>
                          </jms-queue>
                      </jms-destinations>
                  </hornetq-server>
              </subsystem>
      
      

       

      The relevant pool snippet:

       

      <strict-max-pool name="control-mdb-strict-max-pool" max-pool-size="1" instance-acquisition-timeout="2" instance-acquisition-timeout-unit="MINUTES"/>
      

       

      My JMS sender looks like this:

       

         @Resource(mappedName = "java:/ConnectionFactory")
         private ConnectionFactory connectionFactory;
      
      
         @Resource(mappedName = "java:/queue/ControlQueue")
         private Queue controlQueue;
      
      
         @Schedule(second = "*/1", minute = "*", hour = "*", persistent = false)
         public void testControlQueue() {
            try {
               Connection connection = this.connectionFactory.createConnection();
      
      
               try {
                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                  MessageProducer sender = session.createProducer(this.controlQueue);
                  sender.setTimeToLive(2000);
      
                  TextMessage message = session.createTextMessage("Hi! " + new Date());
                  sender.send(message);
               }
               finally {
                  // Clean up any resources.
                  if (connection != null) {
                     connection.close();
                  }
               }
            }
            catch (JMSException jmse) {
               // TODO Auto-generated catch block
               Loggers.business.error("JMS send failed", jmse);
            }
         }
      
      

       

      And my MDB looks like this:

       

      @Pool(value = "control-mdb-strict-max-pool")
      @MessageDriven(activationConfig = {
            @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
            @ActivationConfigProperty(propertyName = "destination", propertyValue = "java:/queue/ControlQueue") })
      public class ControlMessageDrivenBean implements MessageListener {
      
      
         @Override
         public void onMessage(Message message) {
            if (message instanceof TextMessage) {
               TextMessage textMessage = (TextMessage) message;
               try {
                  Loggers.business.info("Received message: [" + textMessage.getText() + "]");
                  Thread.sleep(5000);
               }
               catch (JMSException jmse) {
                  // Log an error rather than throw an exception. There is no point trying to re-process this message (which
                  // will happen if an exception is thrown).
                  Loggers.business.error("Could not retrieve text from message [" + textMessage + "]", jmse);
               }
               catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
               }
            }
      
      
         }
      
      
      }
      
      

       

      I downloaded the hornetQ source and put a break point on the queue and when the reaper runs it seems to think my queue is empty. But I can see from the logging output, that it is definately queuing up my messages, because it processes each message one by one, and the date increases by 1 second, instead of expiring the messages that were queued up while the MDB was sleeping.

       

      Inside "org.hornetq.core.server.impl.QueueImpl.expireReferences()" the iterator always appears to be empty.  Is there some limitation where queue using an "in-vm" queue doesn't support expiration?

        • 1. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
          gaohoward

          I think you need to configure your connection factory's "consumer-window-size" to zero.


          • 2. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
            japearson

            Hi Yong,

             

            It didn't make a difference .

             

            Reading the documentation makes it sound like it should.

             

            For reference this is where I added it:

             

                            <jms-connection-factories>
                                <connection-factory name="InVmConnectionFactory">
                                    <connectors>
                                        <connector-ref connector-name="in-vm"/>
                                    </connectors>
                                    <entries>
                                        <entry name="java:/ConnectionFactory"/>
                                    </entries>
                                    <consumer-window-size>0</consumer-window-size>
                                </connection-factory>
                                <connection-factory name="RemoteConnectionFactory">
                                    <connectors>
                                        <connector-ref connector-name="netty"/>
                                    </connectors>
                                    <entries>
                                        <entry name="RemoteConnectionFactory"/>
                                        <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
                                    </entries>
                                </connection-factory>
                                <pooled-connection-factory name="hornetq-ra">
                                    <transaction mode="xa"/>
                                    <connectors>
                                        <connector-ref connector-name="in-vm"/>
                                    </connectors>
                                    <entries>
                                        <entry name="java:/JmsXA"/>
                                    </entries>
                                </pooled-connection-factory>
                            </jms-connection-factories>
            
            

             

            Any other ideas?

            • 3. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
              clebert.suconic

              You need to add it on the resource adapter.

              • 4. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                clebert.suconic

                MDBs won't use the InVMConnectionFactory. They have their own connection mechanism based on activation-properties.

                • 5. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                  japearson

                  Clebert Suconic wrote:

                   

                  You need to add it on the resource adapter.

                  What do I need to add exactly?

                   

                  I had a look at https://docs.jboss.org/author/display/AS71/Resource+adapters and http://docs.jboss.org/ironjacamar/schema/resource-adapters_1_0.xsd and it wasn't clear to me where I needed to add consumer-window-size presuming that is what you meant.

                   

                  Currently my resource adapter subsystems in jboss standlone.xml is empty

                   

                   

                          <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0"/>
                  

                   

                  My ejb section does contain a reference to the hornetq-ra resource adapter

                   

                   

                          <subsystem xmlns="urn:jboss:domain:ejb3:1.2">
                              <session-bean>
                                  <stateless>
                                      <bean-instance-pool-ref pool-name="slsb-strict-max-pool"/>
                                  </stateless>
                                  <stateful default-access-timeout="5000" cache-ref="simple"/>
                                  <singleton default-access-timeout="5000"/>
                              </session-bean>
                              <mdb>
                                  <resource-adapter-ref resource-adapter-name="hornetq-ra"/>
                                  <bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
                              </mdb>
                              <pools>
                                  <bean-instance-pools>
                                      <strict-max-pool name="slsb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/>
                                      <strict-max-pool name="mdb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/>
                                      <strict-max-pool name="control-mdb-strict-max-pool" max-pool-size="1" instance-acquisition-timeout="2" instance-acquisition-timeout-unit="MINUTES"/>
                                      <strict-max-pool name="polling-mdb-strict-max-pool" max-pool-size="5" instance-acquisition-timeout="2" instance-acquisition-timeout-unit="MINUTES"/>
                                  </bean-instance-pools>
                              </pools>
                              <caches>
                                  <cache name="simple" aliases="NoPassivationCache"/>
                                  <cache name="passivating" passivation-store-ref="file" aliases="SimpleStatefulCache"/>
                              </caches>
                              <passivation-stores>
                                  <file-passivation-store name="file"/>
                              </passivation-stores>
                              <async thread-pool-name="default"/>
                              <timer-service thread-pool-name="default">
                                  <data-store path="timer-service-data" relative-to="jboss.server.data.dir"/>
                              </timer-service>
                              <remote connector-ref="remoting-connector" thread-pool-name="default"/>
                              <thread-pools>
                                  <thread-pool name="default">
                                      <max-threads count="10"/>
                                      <keepalive-time time="100" unit="milliseconds"/>
                                  </thread-pool>
                              </thread-pools>
                              <iiop enable-by-default="false" use-qualified-name="false"/>
                          </subsystem>
                  
                  

                   

                  I also tried adding <consumer-window-size>0</consumer-window-size> the hornetq-ra connection factory and I also tried changing

                   

                   

                     @Resource(mappedName = "java:/ConnectionFactory")
                  

                  to

                   

                     @Resource(mappedName = "java:/JmsXA")
                  

                   

                  to match the XML for hornetq-ra, but it didn't make any difference.

                   

                                      <pooled-connection-factory name="hornetq-ra">
                                          <transaction mode="xa"/>
                                          <connectors>
                                              <connector-ref connector-name="in-vm"/>
                                          </connectors>
                                          <entries>
                                              <entry name="java:/JmsXA"/>
                                          </entries>
                                          <consumer-window-size>0</consumer-window-size>
                                      </pooled-connection-factory>
                  
                  

                   

                  What am I missing?

                  • 6. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                    clebert.suconic

                    http://docs.jboss.org/hornetq/2.2.14.Final/user-manual/en/html/appserver-integration.html#configuring-mdbs

                     

                     

                    there will be an activation property called consumerWindowSize. Set it to 0

                     

                     

                     

                    I'm not sure where that's done on Application Server, but that could be done globally also at the ra.xml if it was AS5. If someone could update me here for AS7.

                     

                     

                    Take a look at the docs and if you can't figure someone here will be able to provide you an example. (Anyone here feel free to help Joel.. don't wait from us )

                    • 7. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                      japearson

                      Looks like it's actually a bug in JBoss 7.1.1 were it ignores the consumer window size that gets specified in standalone.xml. https://issues.jboss.org/browse/AS7-4213

                       

                      Makes much more sense then, I thought I was going crazy.  Because I saw examples in the JBoss 7 documentation that confirmed what I was doing was supposed to work.

                       

                      I guess I will have to wait until the next release of JBoss or apply that patch by hand and compile JBoss myself.

                      • 8. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                        clebert.suconic

                        No... pooled connection factory is used outbound.... so it's never used on MDBs...

                         

                         

                        you are looking for the former jms-ra.... inbound properties are specified by the activation properties. There's no connection factory used on this case.

                        • 9. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                          clebert.suconic

                          also.. to make sure you understand.. when the client buffer tries to receive that message.. that message won't be consumed as the client will treat it as expired. So the issue here is that you have clientConsumerWindowSize > 0 and your client is holding messages. As you start to consume it.. these message will be expired.

                          • 10. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                            ataylor

                            actually clebert the pooled connection factory is what configures the incoming for MDB as well, it has an extra <inbound-config> part for doing that.

                            • 11. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                              clebert.suconic

                              ok.. it's a bug indeed then?

                              • 12. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                                japearson

                                Clebert Suconic wrote:

                                 

                                also.. to make sure you understand.. when the client buffer tries to receive that message.. that message won't be consumed as the client will treat it as expired. So the issue here is that you have clientConsumerWindowSize > 0 and your client is holding messages. As you start to consume it.. these message will be expired.

                                So I compiled JBoss 7.1.2, but I still have the same problem, it makes me think that maybe I do have some configuration problem afterall.

                                 

                                Do you know which part of the HornetQ code is responsible for checking expiration just before the message is consumed? (So I can run a debbuger through it)  Or is my receiving code supposed to check for expiration manually?

                                • 13. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                                  clebert.suconic

                                  https://github.com/hornetq/hornetq/blob/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java

                                   

                                   

                                  this will happen in two circunstances...

                                   

                                  any client.receive(..) call:

                                   

                                      the loop will check for the message.isExpired() flag and will loop until it can find a new message

                                   

                                   

                                  onMessage() calls

                                      The loop will do the same check (message.isExpired()) and will ignore the call, sending the message back to the server

                                   

                                   

                                  In both cases the client will call session.expire(session-ID, messageID) which will expire the message at the server's side.

                                  1 of 1 people found this helpful
                                  • 14. Re: AS 7.1.1 HornetQ Reaper Thread not Reaping
                                    clebert.suconic

                                    If you stay waiting forever on your client (with a wai) the clients on your buffer won't be expired until you start to consume again.

                                    1 2 Previous Next