1 Reply Latest reply on Dec 16, 2010 10:43 AM by lud

    Problem in HornetQ RA activation

    st.bolli

      Hello,

      I am encountering some troubles with a standalone and cluster-symmetric HornetQ v.2.1.2 configuration.

      An MDB configured to listen to a topic running on a remote JBoss 5.1.0.GA AS cannot subscribe at JBoss startup.

      I do not think it is a problem of a deployment order, because:

      1.       1. start an “empty” JBOSS

      2.       2. start HornetQ and wait until it registers in HAJNDI

      3.       3. deploy the MDB

       

      Thank you in advance for any hints.

       

      On the JBoss log, I repeatedly see the following lines:

       

      12:14:55,244 INFO  [HornetQActivation] awaiting topic/queue creation OpCoUpdatesTopic
      12:14:57,570 INFO  [HornetQActivation] Attempting to reconnect org.hornetq.ra.in
      flow.HornetQActivationSpec(ra=org.hornetq.ra.HornetQResourceAdapter@1ae4173 dest
      
      ination=OpCoUpdatesTopic destinationType=javax.jms.Topic ack=Dups-ok-acknowledge
      
      durable=false clientID=null user=null maxSession=15)
      
      

       

      As a workaround, if I touch the MDB jar file, it subscribes correctly.

       

      After some analysis in the code, I realized that the following exception is thrown and internally managed:

       

      HornetQException[errorCode=100 message=Queue 02a470c3-a179-4579-b13d-2734e0243612 does not exist]
              at org.hornetq.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:287)
              at org.hornetq.core.client.impl.ClientSessionImpl.internalCreateConsumer(ClientSessionImpl.java:1556)
              at org.hornetq.core.client.impl.ClientSessionImpl.createConsumer(ClientSessionImpl.java:447)
              at org.hornetq.core.client.impl.ClientSessionImpl.createConsumer(ClientSessionImpl.java:392)
              at org.hornetq.core.client.impl.DelegatingSession.createConsumer(DelegatingSession.java:201)
              at org.hornetq.ra.inflow.HornetQMessageHandler.setup(HornetQMessageHandler.java:165)
              at org.hornetq.ra.inflow.HornetQActivation.setup(HornetQActivation.java:291)
              at org.hornetq.ra.inflow.HornetQActivation.handleFailure(HornetQActivation.java:540)
              at org.hornetq.ra.inflow.HornetQActivation$SetupActivation.run(HornetQActivation.java:578)
              at org.jboss.resource.work.WorkWrapper.execute(WorkWrapper.java:205)
      

       

      The point in the code in which the exception is thrown is where the HornetQMessageHandler tries to create a consumer for the activaction  temporary queue that it has just created (see the code snippet below)

       

       

      SimpleString queueName;
               if (activation.isTopic())
      
               {
      
                  if (activation.getTopicTemporaryQueue() == null)
      
                  {
      
                     queueName = new SimpleString(UUID.randomUUID().toString());
      
                      session.createQueue(activation.getAddress(), queueName, selectorString,  false);     // Temp queue creation is  successful
      
                     activation.setTopicTemporaryQueue(queueName);
      
                  }
      
                  else
      
                  {
      
                     queueName = activation.getTopicTemporaryQueue();
      
                  }
      
               }
      
               else
      
               {
      
                  queueName = activation.getAddress();
      
               }
      
               consumer = session.createConsumer(queueName, selectorString);           // Here the exception is thrown!
      

       

       

      Below you can find the MDB I used for tests. Notice that I tried to put as activation config property destination either “/topic/OpCoUpdatesTopic” or simply “OpCoUpdatesTopic” but the result is the same.

       

      package com.test.diagnostics;
      
      import javax.ejb.ActivationConfigProperty;
      import javax.ejb.MessageDriven;
      import javax.ejb.TransactionManagement;
      import javax.ejb.TransactionManagementType;
      import javax.jms.Message;
      import javax.jms.MessageListener;
      
      import org.apache.log4j.Logger;
      import org.jboss.ejb3.annotation.ResourceAdapter;
      
      /**
      * Message-Driven Bean implementation class for: TestTopicMDB
      *
      */
      @MessageDriven(
                                     activationConfig = {
                                     @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
                                     @ActivationConfigProperty(propertyName = "destination", propertyValue = "/topic/OpCoUpdatesTopic"),
              @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Dups-ok-acknowledge") })
      @TransactionManagement(value = TransactionManagementType.BEAN)
      @ResourceAdapter("hornetq-ra.rar")
      
      
      public class TestTopicMDB implements MessageListener {
      
                      Logger log = Logger.getLogger(TestTopicMDB.class);
      
      
                      /**
           * @see MessageListener#onMessage(Message)
           */
          public void onMessage(Message message) {
                      log.info("Message received: "+message.toString());
          }
      
      }
      

       

      The hornetq-configuration.xml on the node 1

       

      <configuration xmlns="urn:hornetq"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
      
      <clustered>true</clustered>
      
      <cluster-user>myClusterAdmin</cluster-user>
      <cluster-password>myClusterPwd</cluster-password>
      
      
      <paging-directory>${data.dir:../../serverA0/test3/data}/paging</paging-directory>
      
         <bindings-directory>${data.dir:../../serverA0/test3/data}/bindings</bindings-directory>
      
         <journal-directory>${data.dir:../../serverA0/test3/data}/journal</journal-directory>
      
         <journal-min-files>10</journal-min-files>
      
         <large-messages-directory>${data.dir:../../serverA0/test3/data}/large-messages</large-messages-directory>
      
      
      
      
      <!--<backup-connector-ref connector-name="backup-connector"/> -->
      
      
      
      
         <connectors>
            <connector>
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
               <param key="host"  value="172.16.8.32"/>
               <param key="port"  value="5445"/>
            </connector>
      
            <!--<connector name="backup-connector">
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
               <param key="host"  value="172.16.8.33"/>
               <param key="port"  value="5445"/>
            </connector>-->
      
         </connectors>
      
         <acceptors>
            <acceptor>
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
               <param key="host"  value="172.16.8.32"/>
               <param key="port"  value="5445"/>
            </acceptor>
      
      
         </acceptors>
      
      
      <broadcast-groups>
            <broadcast-group>
               <local-bind-address>172.16.8.32</local-bind-address>
               <local-bind-port>5432</local-bind-port>
               <group-address>231.7.7.7</group-address>
               <group-port>9876</group-port>
               <broadcast-period>5000</broadcast-period>
               <connector-ref connector-name="netty-connector" /><!--backup-connector-name="backup-connector"-->
            </broadcast-group>
         </broadcast-groups>
      
         <discovery-groups>
            <discovery-group>
               <!--<local-bind-address>172.16.8.32</local-bind-address>-->
               <group-address>231.7.7.7</group-address>
               <group-port>9876</group-port>
               <refresh-timeout>10000</refresh-timeout>
            </discovery-group>
         </discovery-groups>
      
         <cluster-connections>
                      <!--       
            <cluster-connection>
               <address>jms</address> 
                            <discovery-group-ref discovery-group-name="dg-group1"/>
            </cluster-connection>
                      -->
                      <cluster-connection>
             <address>jms</address>
             <retry-interval>500</retry-interval>
             <use-duplicate-detection>true</use-duplicate-detection>
             <forward-when-no-consumers>false</forward-when-no-consumers>
             <max-hops>1</max-hops>
             <discovery-group-ref discovery-group-name="dg-group1"/>
            </cluster-connection>
         </cluster-connections>
      
      
         <security-settings>
            <security-setting match="#">
               <permission roles="guest"/>
               <permission roles="guest"/>
               <permission roles="guest"/>
               <permission roles="guest"/>
            </security-setting>
         </security-settings>
      
         <address-settings>
            <!--default for catch all-->
            <address-setting match="#">
               <dead-letter-address>jms.queue.DLQ</dead-letter-address>
               <expiry-address>jms.queue.ExpiryQueue</expiry-address>
               <redelivery-delay>0</redelivery-delay>
               <max-size-bytes>10485760</max-size-bytes>      
               <message-counter-history-day-limit>10</message-counter-history-day-limit>
               <address-full-policy>BLOCK</address-full-policy>
            </address-setting>
         </address-settings>
      
      </configuration>
      

       

      The hornetq-configuration.xml on the node 2

       

      <configuration xmlns="urn:hornetq"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
      
      <clustered>true</clustered>
      
      <cluster-user>myClusterAdmin</cluster-user>
      <cluster-password>myClusterPwd</cluster-password>
      
      
      <paging-directory>${data.dir:../../serverB1/test3/data}/paging</paging-directory>
      
         <bindings-directory>${data.dir:../../serverB1/test3/data}/bindings</bindings-directory>
      
         <journal-directory>${data.dir:../../serverB1/test3/data}/journal</journal-directory>
      
         <journal-min-files>10</journal-min-files>
      
         <large-messages-directory>${data.dir:../../serverB1/test3/data}/large-messages</large-messages-directory>
      
      
      
      
      <!--<backup-connector-ref connector-name="backup-connector"/> -->
      
      
      
      
         <connectors>
            <connector>
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
               <param key="host"  value="172.16.8.33"/>
               <param key="port"  value="6445"/>
            </connector>
      
                      <!--<connector>
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
               <param key="host"  value="172.16.8.32"/>
               <param key="port"  value="6445"/>
            </connector>-->
      
         </connectors>
      
         <acceptors>
            <acceptor>
               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
               <param key="host"  value="172.16.8.33"/>
               <param key="port"  value="6445"/>
            </acceptor>
      
      
          </acceptors>
      
      
      <broadcast-groups>
            <broadcast-group>
               <local-bind-address>172.16.8.33</local-bind-address>
               <local-bind-port>5433</local-bind-port>
               <group-address>231.7.7.7</group-address>
               <group-port>9876</group-port>
               <broadcast-period>5000</broadcast-period>
               <connector-ref connector-name="netty-connector"/>
            </broadcast-group>
         </broadcast-groups>
      
         <discovery-groups>
            <discovery-group>
               <!--<local-bind-address>172.16.8.33</local-bind-address>-->
               <group-address>231.7.7.7</group-address>
               <group-port>9876</group-port>
               <refresh-timeout>10000</refresh-timeout>
            </discovery-group>
         </discovery-groups>
      
         <cluster-connections>
         <!--
            <cluster-connection>
               <address>jms</address> 
                            <discovery-group-ref discovery-group-name="dg-group1"/>
            </cluster-connection>
                      -->
                      <cluster-connection>
               <address>jms</address>
               <retry-interval>500</retry-interval>
               <use-duplicate-detection>true</use-duplicate-detection>
               <forward-when-no-consumers>false</forward-when-no-consumers>
               <max-hops>1</max-hops>
               <discovery-group-ref discovery-group-name="dg-group1"/>
          </cluster-connection>
         </cluster-connections>
      
      
         <security-settings>
            <security-setting match="#">
               <permission roles="guest"/>
               <permission roles="guest"/>
               <permission roles="guest"/>
               <permission roles="guest"/>
            </security-setting>
         </security-settings>
      
         <address-settings>
            <!--default for catch all-->
            <address-setting match="#">
               <dead-letter-address>jms.queue.DLQ</dead-letter-address>
               <expiry-address>jms.queue.ExpiryQueue</expiry-address>
               <redelivery-delay>0</redelivery-delay>
               <max-size-bytes>10485760</max-size-bytes>      
               <message-counter-history-day-limit>10</message-counter-history-day-limit>
               <address-full-policy>BLOCK</address-full-policy>
            </address-setting>
         </address-settings>
      
      </configuration>
      

       

      The hornetq-jms.xml file (the same on both nodes)

      <configuration xmlns="urn:hornetq"
                             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                             xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
      
       <connection-factory name="NettyConnectionFactory">
        <discovery-group-ref discovery-group-name="dg-group1"/>
        <connectors>
         <connector-ref connector-name="netty-connector" />
         <!--backup-connector-name="backup-connector" --> 
        </connectors>
        <entries>
         <entry name="/ConnectionFactory"/>
         <entry name="/XAConnectionFactory"/>
        </entries>
       </connection-factory>
      
       <queue name="DLQ">
        <entry name="/queue/DLQ"/>
       </queue>
       <queue name="ExpiryQueue">
        <entry name="/queue/ExpiryQueue"/>
       </queue>   
      
       <topic name="OpCoUpdatesTopic">
        <entry name="/topic/OpCoUpdatesTopic" />
       </topic>
      
      
      </configuration>
      
      
        • 1. Re: Problem in HornetQ RA activation
          lud

          Hello,

          I am working with st.bolli on this issue. Is there anyone who can help us ? Thank you in advance!

          After some analysis I saw that the thread that listens to the discovery group updates the failover array of servers before all the sessions have been created inside HornetQActivation.setup() method.

          So, the following calls in HornetQMessageHandler are directed to two different servers in the cluster:

          session.createQueue(activation.getAddress(), queueName, selectorString,  false);

          and:

           consumer = session.createConsumer(queueName, selectorString)

           

          The above call does not find the queue because it was created on the other server.

           

          My question is:

           

          1) Is it a HornetQ RA bug because the discovery group should not be updated until all the sessions are created by HornetQActivation?

          2) Is it a wrong configuration because the queue should be found on both servers?

           

          Thank you in advance for any help

          L.