0 Replies Latest reply on Dec 6, 2012 11:16 AM by crytek

    Need HornetQ bridge clarifications

    crytek

      Hello,

       

      I've made some hq bridge tests and it seems that it behaves different depending when the queue consumers are registered and connections started.

       

      Conditions:

      -2 cluster nodes (node1 and node2), using broadcast-discovery groups.

       

      a.) On node1:

           - define a simple jms queue:  SourceQ1  (cli command:  /subsystem=messaging/hornetq-server=default/jms-queue=SourceQ1/:add(entries=["java:jboss/exported/jms/queue/SourceQ1"]))

           - define a simple hq core bridge name "my-simple-bridge" which uses the cluster discovery group in order to divert messages from SourceQ1 from this node to the TargetQ1 on node2:

                (Cli Command : /subsystem=messaging/hornetq-server=default/bridge=my-simple-bridge:add(queue-name=jms.queue.SourceQ1, forwarding-address=jms.queue.TargetQ1, retry-interval=10000,                                    ha=false, discovery-group-name=dg-group1, use-duplicate-detection=true, user=AppUser, password=tint252, failover-on-server-shutdown=true)

       

      b.) On node2:

           - define a simple jms queue:  TargetQ1  (cli command:  /subsystem=messaging/hornetq-server=default/jms-queue=TargetQ1/:add(entries=["java:jboss/exported/jms/queue/TargetQ1"]))

       

      Scenario1:

       

      ConnectionFactory connFactory1 = (ConnectionFactory) initialContext0

                          .lookup("jms/RemoteConnectionFactory");

                  Queue sourceQueue = (Queue) initialContext0.lookup("jms/queue/SourceQ1");

       

                 ConnectionFactory connFactory2 = (ConnectionFactory) initialContext1

                          .lookup("jms/RemoteConnectionFactory");

                  Queue targetQueue = (Queue) initialContext1.lookup("jms/queue/TargetQ1");

       

                  connection1 = connFactory1.createConnection(APPLICATIONREALM_USER,

                          APPLICATIONREALM_USER_PWD);

                  connection2 = connFactory2.createConnection(APPLICATIONREALM_USER,

                          APPLICATIONREALM_USER_PWD);

       

                  Session session1 = (Session) connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);

                  Session session2 = (Session) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);

       

                  connection1.start();

                  connection2.start();

       

                  MessageProducer producer = session1.createProducer(sourceQueue);

                  MessageConsumer consumer1 = session1.createConsumer(sourceQueue);

                  MessageConsumer consumer2 = session2.createConsumer(targetQueue);

       

                  TextMessage txtMsg = null;

                  int sentMsgsNo = 10;

       

                  for (int i = 0; i < sentMsgsNo; i++) {

                      txtMsg = session1.createTextMessage("Message walks on bridge...");

                      producer.send(txtMsg);

                  }

                  System.out.println("Producer sent messages.");

       

                  System.out.println("Wait for bridge to divert messages to destination queue ...");

       

       

                  TextMessage dummyResponse = null;

                  int consumer1MsgsNo = 0;

                  int consumer2MsgsNo = 0;

       

                  while ((dummyResponse = (TextMessage) consumer1.receive(4000)) != null) {

                      System.out.println("SourceQueue consumer received message: "

                              + dummyResponse.getText());

                      consumer1MsgsNo++;

                  }

       

                  while ((dummyResponse = (TextMessage) consumer2.receive(4000)) != null) {

                      System.out.println("DestinationQueue consumer received message: "

                              + dummyResponse.getText());

                      consumer2MsgsNo++;

                  }

       

       

      In this scenario, if consumer1  is registered and connection1 started before sending the messages,  then at the end of the test,  consumer1 will receive 5 messages, and consumer2 5 messages.

       

      But in scenario2,

       

       

      ConnectionFactory connFactory1 = (ConnectionFactory) initialContext0

                          .lookup("jms/RemoteConnectionFactory");

                  Queue sourceQueue = (Queue) initialContext0.lookup("jms/queue/SourceQ1");

       

                 ConnectionFactory connFactory2 = (ConnectionFactory) initialContext1

                          .lookup("jms/RemoteConnectionFactory");

                  Queue targetQueue = (Queue) initialContext1.lookup("jms/queue/TargetQ1");

       

                  connection1 = connFactory1.createConnection(APPLICATIONREALM_USER,

                          APPLICATIONREALM_USER_PWD);

                  connection2 = connFactory2.createConnection(APPLICATIONREALM_USER,

                          APPLICATIONREALM_USER_PWD);

       

                  Session session1 = (Session) connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);

                  Session session2 = (Session) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);

       

                  connection1.start();

                  connection2.start();

       

                  MessageProducer producer = session1.createProducer(sourceQueue);

                  MessageConsumer consumer2 = session2.createConsumer(targetQueue);

       

                  TextMessage txtMsg = null;

                  int sentMsgsNo = 10;

       

                  for (int i = 0; i < sentMsgsNo; i++) {

                      txtMsg = session1.createTextMessage("Message walks on bridge...");

                      producer.send(txtMsg);

                  }

                  System.out.println("Producer sent messages.");

       

                  System.out.println("Wait for bridge to divert messages to destination queue ...");

       

                  MessageConsumer consumer1 = session1.createConsumer(sourceQueue);

       

                  TextMessage dummyResponse = null;

                  int consumer1MsgsNo = 0;

                  int consumer2MsgsNo = 0;

       

                  while ((dummyResponse = (TextMessage) consumer1.receive(4000)) != null) {

                      System.out.println("SourceQueue consumer received message: "

                              + dummyResponse.getText());

                      consumer1MsgsNo++;

                  }

       

                  while ((dummyResponse = (TextMessage) consumer2.receive(4000)) != null) {

                      System.out.println("DestinationQueue consumer received message: "

                              + dummyResponse.getText());

                      consumer2MsgsNo++;

                  }

       

      , where consumer1 is registered after the messages where sent, the test ends with 10 messages on consumer2 and 0 on consumer1.

       

      I cannot find out why this is happening. It seems that messages are load-balanced through the consumers. Is this the normal behaviour ? The bridge is NOT unidirectional, but bidirectional ? (although the bridge is defined only on node1 and its definition says that the forwarding address is only TargetQ1 on node2)

       

      Thanks,

      Regards