11 Replies Latest reply on Apr 24, 2010 1:45 PM by zenzei2k

    Accesing HornetQ topic remotely

    zenzei2k


      Hi again mates! I have replaced JbossMQ with HornetQ 2.0 GA in a Jboss 4.2.3 server.
      Trying to test it remotely, but I'm having some problems. My configuration is this

       

      hornetq-configuration

      {code:xml}

         <connectors>
            <connector name="netty">
               <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
               <param key="host"  value="${jboss.bind.address}"/>
               <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
            </connector>

       

            <connector name="in-vm">
               <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
            </connector>

       

         </connectors>

       

         <acceptors>  
            <acceptor name="netty">
               <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
               <param key="host"  value="${hornetq.remoting.netty.host:0.0.0.0}"/>
               <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
            </acceptor>

       

            <acceptor name="in-vm">
              <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
              <param key="server-id" value="0"/>
            </acceptor>

       

         </acceptors>

      {code}

       

      hornetq-jms

      {code:xml}

         <connection-factory name="NettyConnectionFactory">
            <connectors>
               <connector-ref connector-name="netty"/>
            </connectors>
            <entries>
               <entry name="/ConnectionFactory"/>
               <entry name="/XAConnectionFactory"/>
            </entries>
            <pre-acknowledge>true</pre-acknowledge>
         </connection-factory>

      {code}


      jms-ds

      {code:xml}

      <connection-factories>
         <!--
          JMS Stuff
         -->

       

         <mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="hornetq:service=JMSProviderLoader,name=JMSProvider">
            <attribute name="ProviderName">DefaultJMSProvider</attribute>
            <attribute name="ProviderAdapterClass">org.jboss.jms.jndi.JNDIProviderAdapter</attribute>
            <attribute name="FactoryRef">java:/XAConnectionFactory</attribute>
            <attribute name="QueueFactoryRef">java:/XAConnectionFactory</attribute>
            <attribute name="TopicFactoryRef">java:/XAConnectionFactory</attribute>
         </mbean>
         <!--
          JMS XA Resource adapter, use this to get transacted JMS in beans
         -->
         <tx-connection-factory>
            <jndi-name>JmsXA</jndi-name>
            <xa-transaction/>
            <rar-name>jms-ra.rar</rar-name>
            <connection-definition>org.hornetq.ra.HornetQRAConnectionFactory</connection-definition>
            <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
            <config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
            <max-pool-size>20</max-pool-size>
            <security-domain-and-application>JmsXARealm</security-domain-and-application>
         </tx-connection-factory>
      </connection-factories>

      {/code}

       

      The jboss.bind.address is specified at start with the -b switch of Jboss (-b 192.168.1.124). From a Jboss Bean I'm sending some messages and I have a client MessageConsumer to receive them. If I test the MessageConsumer in localhost, the test goes ok and receives the messages, but when I test it in a remote machine, no message is receive from server, although the creation of the MessageConsumer goes ok (and doesn't throw any error after creating connection/session/consumer and starting the connection).
      I had specified the jndi.properties as system variables (-Djava.naming.provider.url=jnp://192.168.1.124:1099 -Djava.naming.factory.initial=org.jnp.interfaces.NamingContextFactory -Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces)

       

      Do I'm missing some configuration point?
      Thanks in advance!

        • 1. Re: Accesing HornetQ topic remotely
          clebert.suconic

          Try putting jboss.bind.address on the acceptor as weel:

           

             <acceptors>  
                <acceptor name="netty">
                   <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
                   <param key="host"  value="${jboss.bind.address:0.0.0.0}"/>
                   <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                </acceptor>

          • 2. Re: Accesing HornetQ topic remotely
            zenzei2k

            Hi Clebert! Thanks for your tip. But I tried with that before, and using also jboss.bind.adress in the NettyAcceptor gives a recursive log showing this when I tried to initialize my MessageProducer

             

            ....

            2010-04-23 14:49:02,000 DEBUG [org.hornetq.integration.transports.netty.NettyConnector] Started Netty Connector version 3.1.5.GA-r1772
            2010-04-23 14:49:05,046 DEBUG [org.hornetq.integration.transports.netty.NettyConnector] Started Netty Connector version 3.1.5.GA-r1772
            2010-04-23 14:49:08,031 DEBUG [org.hornetq.integration.transports.netty.NettyConnector] Started Netty Connector version 3.1.5.GA-r1772
            2010-04-23 14:49:11,046 DEBUG [org.hornetq.integration.transports.netty.NettyConnector] Started Netty Connector version 3.1.5.GA-r1772
            2010-04-23 14:49:14,062 DEBUG [org.hornetq.integration.transports.netty.NettyConnector] Started Netty Connector version 3.1.5.GA-r1772
            2010-04-23 14:49:17,078 DEBUG [org.hornetq.integration.transports.netty.NettyConnector] Started Netty Connector version 3.1.5.GA-r1772

            ....

             

            This message is logged after entering the follow line, and it never pass it (it hangs there), so I suppose its logged inside the   createSession.

            connection.createSession(true, HornetQJMSConstants.PRE_ACKNOWLEDGE);

             

             

            By the way, the initialize method of my MessageProducer is

             

            public void initialize() throws Exception
              {
                ConnectionFactory cf = (ConnectionFactory) jndiTemplate.lookup("java:/JmsXA");

             

                Destination topic = (Destination) jndiTemplate.lookup("topic/testTopic");

             

                log.debug("Creating connection");
                connection = cf.createConnection();
                log.debug("Creating session");
                session = connection.createSession(true, HornetQJMSConstants.PRE_ACKNOWLEDGE);
                log.debug("Creating producer");
                producer = session.createProducer(topic);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                producer.setDisableMessageID(true);
                producer.setDisableMessageTimestamp(true);
                producer.setTimeToLive(5000);
               
                connection.start();
              }

             

             

            Do you think there is any problem with my initialize method?

            I would try know with another acknowledgeMode to see if that's the problem.

            • 3. Re: Accesing HornetQ topic remotely
              clebert.suconic

              ConnectionFactory cf = (ConnectionFactory) jndiTemplate.lookup("java:/JmsXA");

               

               

              This is the ConnectionFactory for the JCA CF. I'm actually surprised you could have access to java:/ on the remote VM. Perhaps you didn't explain some piece of the puzzle in your architecture.

               

               

              You're supposed to use a regular connection factory remotely. Unless you configure a JCA ConnectionFactory inside another container (talking to a remote box).

              • 4. Re: Accesing HornetQ topic remotely
                clebert.suconic

                I bet your remote box is another JBoss instance.

                 

                java:/JmsXA is the JCA Connection factory usually connected to the local InVM JMS Server.

                 

                You could however reconfigure JmsXA to talk to a remote server if you change the properties on the connector. (There are examples for that on the HornetQ distribution).

                 

                 

                You should either use a regular connection factory by contacting the remote JNDI and looking up the remote CF, or direct instantiating the objects with the proper connector attributes.

                 

                Or.. you should configure the JmsXA in your remote container to connect to the other container.

                 

                 

                I'm assuming your client is inside a JBoss Application Server, otherwise you would get NameNotFound when you looked up for the JmsXA.

                • 5. Re: Accesing HornetQ topic remotely
                  zenzei2k

                  Sorry Clebert, i'm using MessageProducer inside the container, and MessageConsumer in the client (standalone client, outside Jboss).

                   

                  Inside the container I use the java:/JmsXA (with the initialize method I show before), and in the MessageConsumer I use the ConnectionFactory. The initialize method of the MessageProducer as I post above is

                  public void initialize() throws Exception
                    {
                       ConnectionFactory cf = (ConnectionFactory)  jndiTemplate.lookup("java:/JmsXA");

                   

                      Destination topic =  (Destination) jndiTemplate.lookup("topic/testTopic");

                   

                       log.debug("Creating connection");
                      connection =  cf.createConnection();
                      log.debug("Creating session");
                       session = connection.createSession(true,  HornetQJMSConstants.PRE_ACKNOWLEDGE);
                      log.debug("Creating  producer");
                      producer = session.createProducer(topic);
                       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                       producer.setDisableMessageID(true);
                       producer.setDisableMessageTimestamp(true);
                       producer.setTimeToLive(5000);
                     
                      connection.start();
                    }

                   

                  And the initialize method of the MessageConsumer is

                   

                  public void initialize() throws Exception
                    {
                      ConnectionFactory cf = (ConnectionFactory) jndiTemplate.lookup("ConnectionFactory");

                   

                      Destination destination = (Destination) jndiTemplate.lookup("topic/testTopic");

                   

                      log.debug("Creating connection");
                      connection = cf.createConnection();
                     
                      if (this.clientID != null)
                        connection.setClientID(this.clientID);

                   

                      log.debug("Creating session");
                      session = connection.createSession(false, HornetQJMSConstants.PRE_ACKNOWLEDGE);
                      log.debug("Creating consumer");
                      consumer = session.createConsumer(destination);
                      consumer.setMessageListener(this);

                   

                    }

                   

                  What I was trying to achieve is receiving the messages send by Jboss (with HornetQ) from a remote client .

                  If I use the connectors as shown in the first post, I can connect with the client test (outside Jboss) from localhost and from remote machines, but I only receive messages in localhost.

                  If I use jboss.bind.adress in NettyAcceptor as you suggest, the MessageProducer doesn't manage to start, it hangs in the line connection.createSession(false, HornetQJMSConstants.PRE_ACKNOWLEDGE) showing the recursive line in the log.

                  DEBUG  [org.hornetq.integration.transports.netty.NettyConnector] Started Netty  Connector version 3.1.5.GA-r1772

                   

                  I also try with AUTO_ACKNOWLEDGE mode and doesn't start either.

                   

                  Thanks again and sorry for not being clear at the first time.

                  • 6. Re: Accesing HornetQ topic remotely
                    zenzei2k

                    Some more info. Debugging the HornetQ sources found that the recursive log start in line 837 of HornetQRASessionFactoryImpl, where there is a call to org.jboss.resource.connectionamanager.BaseConnectionManager2$ConnectionManagerProxy.allocateConnection.

                    Maybe the problem is in Jboss?

                    • 7. Re: Accesing HornetQ topic remotely
                      clebert.suconic

                      You're only calling connection.start at the producer side. You won't start receiving messages until you called .start at the consumer's side.

                      • 8. Re: Accesing HornetQ topic remotely
                        zenzei2k

                        Sorry, I make the call to start after the intialize method (I make it separately, as well as stop, to call them as needed).

                         

                        public void start() 
                          {
                            try {
                              connection.start();
                            } catch (Exception e) {    
                              log.error("Error on connection start" , e);    
                            }  
                          }

                         

                        Remember I said that the client works locally but not remotely. I don't post the full class to not make the post so bigger, but now I   realized I should do it at first, my mistake

                         

                        {code}

                        public class HornetQMessageReceiver implements MessageListener, ExceptionListener {

                         

                          protected Logger log = LoggerFactory.getLogger(this.getClass());
                         
                          private JndiTemplate jndiTemplate;

                         

                          private MessageConsumer consumer;

                         

                          private Connection connection;
                         
                          private Session session;

                         

                          private EventHandler eventHandler;
                         
                          private String clientID;

                         

                          public void initialize() throws Exception
                          {
                            ConnectionFactory cf = (ConnectionFactory) jndiTemplate.lookup("ConnectionFactory");

                         

                            Destination destination = (Destination) jndiTemplate.lookup("topic/testTopic");

                         

                            log.debug("Creating connection");
                            connection = cf.createConnection();
                           
                            if (this.clientID != null)
                              connection.setClientID(this.clientID);

                         

                            log.debug("Creating session");
                            session = connection.createSession(false, HornetQJMSConstants.PRE_ACKNOWLEDGE);
                            log.debug("Creating consumer");
                            consumer = session.createConsumer(destination);
                            consumer.setMessageListener(this);

                          }

                         

                          public void start() 
                          {
                            try {
                              connection.start();
                            } catch (Exception e) {    
                              log.error("Error on connection start" , e);    
                            }  
                          }

                         

                          public void stop() 
                          {
                            try {
                              connection.stop();
                            } catch (Exception e) {    
                              log.error("Error on connection stop" , e);    
                            }  
                          }

                         

                          public void destroy()
                          {
                            try {
                              log.debug("Destroying producer");
                              if (connection != null) {
                                connection.close();
                                connection = null;
                              }
                            } catch (JMSException e) {
                              log.error("Error closing connection ", e);
                            }
                          }

                         

                          @Override
                          public void onMessage(Message message)
                          {
                            try {
                              ObjectMessage objectMessage = (ObjectMessage) message;

                              log.info("Message received {}.", objectMessage.getObject());

                         

                            } catch (Exception e) {    
                              log.error("Error on message processing", e);    
                            }  
                          }

                         

                          @Override
                          public void onException(JMSException exception)
                          {
                            log.error("Connection failure has been detected on a the client. (ErrorCode: " + exception.getErrorCode() + ", Message: "+exception.getMessage()+")", exception);
                            log.info("Trying to reconnect");   

                         

                            int seconds = 10;
                           
                            boolean reconnected = false;
                            while (!reconnected)
                            {
                              try {  
                               
                                this.stop() ;
                                this.destroy();
                               
                                log.info("The old resources have been closed.");
                               
                                this.initialize() ;      
                                this.start() ; 
                               
                                reconnected = true;
                               
                              } catch (Throwable e) {
                                log.error("Reconnection error", e);
                                log.info("Trying reconnection in "+seconds+" sec.");   
                                try {
                                  Thread.sleep(seconds*1000);
                                } catch (InterruptedException e1) {
                                  log.error("Thread sleep error", e1);       
                                }
                              }
                            }
                           
                            log.info("Succesfull reconnection. The new resources have been created.");
                          }

                         

                          public void setJndiTemplate(JndiTemplate jndiTemplate)
                          {
                            this.jndiTemplate = jndiTemplate;
                          }

                         

                          public void setClientID(String clientID)
                          {
                            this.clientID = clientID;
                          }
                        }

                        {code}

                         

                        Another test I made was tried setting in my MessageConsumer (inside  container)  XAConnectionFactory insted of java:/JmsXA. Using it, the  MessageProducer  starts ok, but no message is receive remotely in the  MessageConsumer (same as first post). I  don't know what the difference  could be, I always use java:/JmsXA inside  container because of the  automatic pooling.

                        • 9. Re: Accesing HornetQ topic remotely
                          clebert.suconic

                          I think this will certainly be some simple mistake you're making. Maybe you're pointing out to a wrong JNDI? Maybe you're not setting your JNDI.properties, and not downloading the right CF?

                           

                           

                          If you think there's a bug.. you should create a simple setup.. post it here and we can then verify if there's anything wrong while running inside JBoss AS 4.2. Allthought I don't think it will be the case as this is such a simple use case.  (Well. I never say never :-) .. so if you can replicate it as a bug in a simple setup.. i can try it here instead of guessing).

                          • 10. Re: Accesing HornetQ topic remotely
                            zenzei2k

                            Thanks Clebert, I also think that is a simple case, and it's my fault in some point, but i don't get where is the error

                            So I will try to upload the simple test.

                             

                            I was also thinking about what you say of setting the  NettyAcceptorFactory to jboss.bind.address. Would't that mean that  HornetQ would be accessible only from that IP, and not remotely?

                            • 11. Re: Accesing HornetQ topic remotely
                              zenzei2k

                              Hi Clebert! Finally after debugging the source code I discover that  the problem was not setting the ConnectionParameters property in  jms-ds.xml. That was the reason of the recusive log of Netty starting  the connector, cause it had -1 reconnectAtempts, and as localhost was  not found (remember the -b XX.XX.XX.XX I use to start Jboss), it was  trying to reconnect (maybe HornetQ could warn about trying to  reconnect?).

                               

                              <tx-connection-factory>
                                     <jndi-name>JmsXA</jndi-name>
                                     <xa-transaction/>
                                     <rar-name>jms-ra.rar</rar-name>
                                      <connection-definition>org.hornetq.ra.HornetQRAConnectionFactory</connection-definition>
                                     <config-property name="SessionDefaultType"  type="java.lang.String">javax.jms.Topic</config-property>
                                     <config-property name="JmsProviderAdapterJNDI"  type="java.lang.String">java:/DefaultJMSProvider</config-property>
                                     <config-property name="ConnectionParameters"  type="java.lang.String">host=${jboss.bind.address}</config-property>
                                     <max-pool-size>20</max-pool-size>
                                      <security-domain-and-application>JmsXARealm</security-domain-and-application>
                                  </tx-connection-factory>

                               

                              Now it works!

                               

                              By the way, I saw that if I have a local MDB in the server, it gives the same  error of recusive log trying to reconncect because localhost is not  found, cause it's trying to use the HornetQResourceAdapter that has

                              <config-property>
                                        <description>The transport configuration. These values must be in  the form of key=val;key=val;</description>
                                        <config-property-name>ConnectionParameters</config-property-name>
                                         <config-property-type>java.lang.String</config-property-type>
                                        <config-property-value>host=localhost;port=5445</config-property-value>
                                     </config-property>

                               

                              Do you know how to replace the host parameter dynamically in ra.xml? I try to use the variable jboss.bind.address there, but it didn't work,

                               

                              13:33:32,864 ERROR [NettyConnector] Failed to create netty connection
                              java.net.UnknownHostException: ${jboss.bind.address}

                               

                              I could use <config-property-value>host=0.0.0.0 ;port=5445</config-property-value> but I don't know if its right to do that.