1 2 Previous Next 23 Replies Latest reply on May 20, 2015 10:14 AM by rjakkula

    Jboss SOA P hornetq core bridge

    rjakkula

      Hi,

       

      I am trying to create a core bridge between two core queues in Jboss SOA-P 5 with the below configuration to forward messages from QueueA to QueueB.

       

      I see that the messages are getting added to QueueA but they are not being forwarded to QueueB. What am I missing here? Can you please guide me?

       

      hornetq-configuration.xml :

       

      <bridges>

           <bridge name="test-bridge">

                <queue-name>jms.queue.QueueA</queue-name>

                <forwarding-address>jms.queue.QueueB</forwarding-address>

                <retry-interval>1000</retry-interval>

                <retry-interval-multiplier>1.0</retry-interval-multiplier>

                <reconnect-attempts>-1</reconnect-attempts>

                <failover-on-server-shutdown>false</failover-on-server-shutdown>

                <use-duplicate-detection>true</use-duplicate-detection>

                <confirmation-window-size>10000000</confirmation-window-size>

                <static-connectors>

                   <connector-ref>netty</connector-ref>

                </static-connectors>         

          </bridge>

      <bridges>


      <queues>

         <queue name="jms.queue.QueueA">

             <address>jms.queue.QueueA</address>

          </queue>

         <queue name="jms.queue.QueueB">

             <address>jms.queue.QueueB</address>

          </queue>

        </queues>

       

            <connector name="netty">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

               <param key="host"  value="${jboss.esb.bind.address:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>

            </connector>

       

            <acceptor name="netty">

                <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

               <param key="host"  value="localhost"/>

               <param key="port"  value="5445"/>

            </acceptor>


      hornetq-jms.xml :


          <queue name="jms.queue.QueueA">

                <entry name="jms.queue.QueueA" />

          </queue>

       

         <queue name="jms.queue.QueueB">

              <entry name="jms.queue.QueueB" />

          </queue>

        • 1. Re: Jboss SOA P hornetq core bridge
          jbertram

          Couple of things:

          • If you're bridge messages between queues on the same server then you should use an in-vm connector rather than a netty connector.
          • The names of the queues in your hornetq-jms.xml should not be explicitly prefixed with "jms.queue."  When a JMS queue is created (e.g. via the XML specified in hornetq-jms.xml) a core queue is created in the background using the "name" of the JMS queue prefixed with "jms.queue."  That is why you reference the queue with "jms.queue.*" when dealing with core resources (e.g. a core bridge).  If you specify the name of the JMS queue as "jms.queue.QueueA" then the core queue created for it will be "jms.queue.jms.queue.QueueA."
          • 2. Re: Jboss SOA P hornetq core bridge
            rjakkula

            Thanks Justin. With the changes you mentioned it works fine using invm. But instead of moving the messages from QueueA, its copying the message to QueueB

             

            If QueueB is on a different node, we need to use Netty I believe? What other changes in the configuration do I need to make to make it work with Netty and remote queue?

             

            Thanks again !

            • 3. Re: Jboss SOA P hornetq core bridge
              jbertram

              With the changes you mentioned it works fine using invm. But instead of moving the messages from QueueA, its copying the message to QueueB

              I'm not clear on these points.  Is it working fine or is there a problem?

               

              If QueueB is on a different node, we need to use Netty I believe? What other changes in the configuration do I need to make to make it work with Netty and remote queue?

              If you want to bridge messages to a queue on a remote instance then yes you will need to use a netty connector.  Changing the connector should be the only bridge modification necessary.

              • 4. Re: Jboss SOA P hornetq core bridge
                rjakkula

                Sorry Justin could not post the the forum as it kept saying no post allowed before 3600 s.

                 

                 

                With the changes you mentioned it works fine using invm. But instead of moving the messages from QueueA, its copying the message to QueueB

                 

                What I meant was the bridge was making a copy of the message in QueueA and placing it in QueueB. The behavior should be to move the message completely from QueueA to QueueB. I still see the old message in QueueA, which should not be the case.

                 

                Is there some setting to control this behavior? Thanks

                • 5. Re: Jboss SOA P hornetq core bridge
                  jbertram

                  I'm not sure how it would be possible for the bridge to create a copy of the message.  Can you provide me with a reproducible test-case?

                  • 6. Re: Jboss SOA P hornetq core bridge
                    rjakkula

                    Justin,

                     

                     

                    Below is my hornetq-configuration.xml, hornetq-jms.xml and the client I used to write to QueueA.

                     

                     

                     

                     

                    After inserting the message into QueueA using the client. I see that the message count in the admin-console went up by 1. At the same time, I see that the message count on QueueB also went up by 1. Ideally the message count on QueueA should be 0 since the message is transferred by the bridge from QueueA to QueueB. When did the same again the count on each queue went up by 1. Please help me on this.

                     

                     

                    <configuration xmlns="urn:hornetq"

                                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                                   xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

                       <name>HornetQ.main.config</name>

                       <log-delegate-factory-class-name>org.hornetq.integration.logging.Log4jLogDelegateFactory</log-delegate-factory-class-name>  

                       <bindings-directory>${jboss.server.data.dir}/hornetq/bindings</bindings-directory>

                       <journal-directory>${jboss.server.data.dir}/hornetq/journal</journal-directory>  

                       <journal-min-files>10</journal-min-files>

                       <large-messages-directory>${jboss.server.data.dir}/hornetq/largemessages</large-messages-directory>

                       <paging-directory>${jboss.server.data.dir}/hornetq/paging</paging-directory>

                       <connectors>

                          <connector name="netty">

                             <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

                             <param key="host"  value="${jboss.esb.bind.address:localhost}"/>

                             <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>

                          </connector>     

                          <connector name="netty-throughput">

                             <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

                             <param key="host"  value="${jboss.esb.bind.address:localhost}"/>

                             <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>

                             <param key="batch-delay" value="50"/>

                          </connector>

                          <connector name="in-vm">

                             <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>

                             <param key="server-id" value="${hornetq.server-id:0}"/>

                          </connector>

                       </connectors>

                       <acceptors>  

                          <acceptor name="netty">

                             <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

                             <param key="host"  value="localhost"/>

                             <param key="port"  value="5445"/>

                          </acceptor>     

                          <acceptor name="netty-throughput">

                             <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

                             <param key="host"  value="${jboss.esb.bind.address:localhost}"/>

                             <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>

                             <param key="batch-delay" value="50"/>

                             <param key="direct-deliver" value="false"/>

                          </acceptor>

                          <acceptor name="in-vm">

                            <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>

                            <param key="server-id" value="0"/>

                          </acceptor>

                       </acceptors>

                       <security-settings>

                          <security-setting match="#">

                             <permission type="createNonDurableQueue" roles="guest"/>

                             <permission type="deleteNonDurableQueue" roles="guest"/>

                             <permission type="consume" roles="guest"/>

                             <permission type="send" roles="guest"/>

                          </security-setting>

                       </security-settings>

                       <address-settings>

                          <!--default for catch all-->

                          <address-setting match="#">

                             <dead-letter-address>jms.queue.DLQ</dead-letter-address>

                             <expiry-address>jms.queue.ExpiryQueue</expiry-address>

                             <redelivery-delay>0</redelivery-delay>

                             <max-size-bytes>10485760</max-size-bytes>      

                             <message-counter-history-day-limit>10</message-counter-history-day-limit>

                             <address-full-policy>BLOCK</address-full-policy>

                             <redistribution-delay>60000</redistribution-delay>

                          </address-setting>

                       </address-settings>

                      <bridges>

                         <bridge name="emf-az-dz-bridge">

                              <queue-name>QueueA</queue-name>

                              <forwarding-address>jms.queue.QueueB</forwarding-address>

                              <retry-interval>1000</retry-interval>

                              <retry-interval-multiplier>1.0</retry-interval-multiplier>

                              <reconnect-attempts>-1</reconnect-attempts>

                              <failover-on-server-shutdown>false</failover-on-server-shutdown>

                              <use-duplicate-detection>true</use-duplicate-detection>

                              <confirmation-window-size>10000000</confirmation-window-size>

                              <static-connectors>

                                 <connector-ref>in-vm</connector-ref>

                              </static-connectors>         

                        </bridge>

                      </bridges>

                      <queues>

                       <queue name="QueueA">

                           <address>jms.queue.QueueA</address>

                        </queue>

                      </queues>

                    </configuration>

                     

                     

                    hornetq-jms.xml

                     

                     

                     

                     

                    <configuration xmlns="urn:hornetq"

                                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                                xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

                       <connection-factory name="NettyConnectionFactory">

                          <xa>true</xa>

                          <connectors>

                             <connector-ref connector-name="netty"/>

                          </connectors>

                          <entries>

                             <entry name="/ConnectionFactory"/>

                             <entry name="/XAConnectionFactory"/>

                          </entries>

                       </connection-factory>  

                       <connection-factory name="NettyThroughputConnectionFactory">

                          <xa>true</xa>

                        <connectors>

                             <connector-ref connector-name="netty-throughput"/>

                        </connectors>

                      <entries>

                      <entry name="/ThroughputConnectionFactory"/>

                      <entry name="/XAThroughputConnectionFactory"/>

                      </entries>

                      </connection-factory>  

                       <connection-factory name="InVMConnectionFactory">

                          <xa>true</xa>

                          <connectors>

                             <connector-ref connector-name="in-vm"/>

                          </connectors>

                          <entries>

                             <entry name="java:/ConnectionFactory"/>

                             <entry name="java:/XAConnectionFactory"/>

                          </entries>

                       </connection-factory>  

                       <queue name="DLQ">

                          <entry name="/queue/DLQ"/>

                       </queue>  

                       <queue name="ExpiryQueue">

                          <entry name="/queue/ExpiryQueue"/>

                       </queue>

                        <queue name="QueueA">

                      <entry name="jms.queue.QueueA" />

                        </queue>  

                    </configuration>

                     

                     

                    Client:

                     

                     

                     

                     

                    import java.util.HashMap;

                    import java.util.Map;

                     

                     

                     

                     

                    import javax.jms.Connection;

                    import javax.jms.MessageProducer;

                    import javax.jms.Queue;

                    import javax.jms.Session;

                    import javax.jms.TextMessage;

                     

                     

                     

                     

                    import org.hornetq.api.core.TransportConfiguration;

                    import org.hornetq.api.jms.HornetQJMSClient;

                    import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

                    import org.hornetq.core.remoting.impl.netty.TransportConstants;

                    import org.hornetq.jms.client.HornetQConnectionFactory;

                     

                     

                     

                     

                    public class HornetQClientSender {

                     

                     

                     

                     

                      public static void main(String[] args) throws Exception {

                     

                     

                     

                     

                      Connection connection = null;

                      try {

                     

                     

                     

                     

                      Map<String, Object> connectionParams = new HashMap<String, Object>();

                      connectionParams

                      .put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");

                      connectionParams.put(TransportConstants.PORT_PROP_NAME, 5445);

                      TransportConfiguration transportConfiguration = new TransportConfiguration(

                      NettyConnectorFactory.class.getName(), connectionParams);

                      HornetQConnectionFactory cf = new HornetQConnectionFactory(false,

                      transportConfiguration);

                      connection = cf.createConnection();

                      Session session = connection.createSession(false,

                      Session.AUTO_ACKNOWLEDGE);

                      connection.start();

                      Queue queue = HornetQJMSClient

                      .createQueue("QueueA");

                      MessageProducer producer = session.createProducer(queue);

                      TextMessage message = session

                      .createTextMessage("Text message");

                      producer.send(message);

                      System.out.println("Sent message: " + message.getText());

                     

                     

                     

                     

                      } finally {

                     

                      if (connection != null) {

                      connection.close();

                      }

                      }

                      }

                    }

                    • 7. Re: Jboss SOA P hornetq core bridge
                      jbertram

                      Couple of questions:

                      1. Where is QueueB defined?  I didn't see it in the pasted configuration.
                      2. Are you looking at the "MessageCount" attribute or the "MessagesAdded" attribute?
                      3. What is the "admin-console" that you referenced?

                       

                      Can you supply me with a reproducible test-case?  This would be something I could actually run to reproduce the behavior you're seeing.  I'm not talking about a bunch of pasted code and configuration.

                      • 8. Re: Jboss SOA P hornetq core bridge
                        rjakkula

                        Justin,

                         

                        1. QueueB is is deployed as part of our application in hornet-jms.xml(not core queue) on Jboss server.

                        2 & 3. I am looking at MessageAdded attribute in the admin-console of my local jboss server in the browser

                         

                        I will try to setup two core queues and see if I can share you the full code. Actually the pasted code are the full file contents, only QueueB is missing. Anyway I will provide you full files. I am limited by the 1 hr a post restriction.

                        • 9. Re: Jboss SOA P hornetq core bridge
                          jbertram

                          2 & 3. I am looking at MessageAdded attribute in the admin-console of my local jboss server in the browser

                          I don't think you'll need to provide a reproducer.  This is the expected behavior of the "MessagesAdded" attribute.  It is the total count of all the messages that have been added to the queue since the server started (or since the count was reset).

                          • 10. Re: Jboss SOA P hornetq core bridge
                            rjakkula

                            Sorry for dragging this issue Justin, but both the MessageCount and MessageAdded attributes match. That is messages are waiting to be delivered in QueueA. As soon as I add a message to QueueA, both MessageCount and MessageAdded attributes are up by 1.

                             

                            At the same time QueueB MessageAdded and MessageCount went up by 1. Not sure how it is possible since the if it is delivered to QueueB, then that message should not be still in QueueA. Thanks.

                            • 11. Re: Jboss SOA P hornetq core bridge
                              jbertram

                              If you connect a consumer to QueueA do you get messages?

                              • 12. Re: Jboss SOA P hornetq core bridge
                                rjakkula

                                Yes. I see the same messages that I initially inserted.

                                • 13. Re: Jboss SOA P hornetq core bridge
                                  jbertram

                                  Alright.  Go ahead and provide me with a reproducible test-case and I'll take a closer look.

                                  • 14. Re: Jboss SOA P hornetq core bridge
                                    rjakkula

                                    Justin,

                                     

                                    I uploaded the sender, receiver clients and also the hornetq-jms.xml and hornetq-configuration.xml

                                     

                                    https://developer.jboss.org/servlet/JiveServlet/downloadBody/53332-102-1-172751/Hornetq%20bridge.zip

                                     

                                    Please take a look. If you need the jars to run the client let me know. Thanks.

                                    1 2 Previous Next