Need HornetQ bridge clarifications
crytek Dec 6, 2012 11:16 AMHello,
I've made some hq bridge tests and it seems that it behaves different depending when the queue consumers are registered and connections started.
Conditions:
-2 cluster nodes (node1 and node2), using broadcast-discovery groups.
a.) On node1:
- define a simple jms queue: SourceQ1 (cli command: /subsystem=messaging/hornetq-server=default/jms-queue=SourceQ1/:add(entries=["java:jboss/exported/jms/queue/SourceQ1"]))
- define a simple hq core bridge name "my-simple-bridge" which uses the cluster discovery group in order to divert messages from SourceQ1 from this node to the TargetQ1 on node2:
(Cli Command : /subsystem=messaging/hornetq-server=default/bridge=my-simple-bridge:add(queue-name=jms.queue.SourceQ1, forwarding-address=jms.queue.TargetQ1, retry-interval=10000, ha=false, discovery-group-name=dg-group1, use-duplicate-detection=true, user=AppUser, password=tint252, failover-on-server-shutdown=true)
b.) On node2:
- define a simple jms queue: TargetQ1 (cli command: /subsystem=messaging/hornetq-server=default/jms-queue=TargetQ1/:add(entries=["java:jboss/exported/jms/queue/TargetQ1"]))
Scenario1:
ConnectionFactory connFactory1 = (ConnectionFactory) initialContext0
.lookup("jms/RemoteConnectionFactory");
Queue sourceQueue = (Queue) initialContext0.lookup("jms/queue/SourceQ1");
ConnectionFactory connFactory2 = (ConnectionFactory) initialContext1
.lookup("jms/RemoteConnectionFactory");
Queue targetQueue = (Queue) initialContext1.lookup("jms/queue/TargetQ1");
connection1 = connFactory1.createConnection(APPLICATIONREALM_USER,
APPLICATIONREALM_USER_PWD);
connection2 = connFactory2.createConnection(APPLICATIONREALM_USER,
APPLICATIONREALM_USER_PWD);
Session session1 = (Session) connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = (Session) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
connection2.start();
MessageProducer producer = session1.createProducer(sourceQueue);
MessageConsumer consumer1 = session1.createConsumer(sourceQueue);
MessageConsumer consumer2 = session2.createConsumer(targetQueue);
TextMessage txtMsg = null;
int sentMsgsNo = 10;
for (int i = 0; i < sentMsgsNo; i++) {
txtMsg = session1.createTextMessage("Message walks on bridge...");
producer.send(txtMsg);
}
System.out.println("Producer sent messages.");
System.out.println("Wait for bridge to divert messages to destination queue ...");
TextMessage dummyResponse = null;
int consumer1MsgsNo = 0;
int consumer2MsgsNo = 0;
while ((dummyResponse = (TextMessage) consumer1.receive(4000)) != null) {
System.out.println("SourceQueue consumer received message: "
+ dummyResponse.getText());
consumer1MsgsNo++;
}
while ((dummyResponse = (TextMessage) consumer2.receive(4000)) != null) {
System.out.println("DestinationQueue consumer received message: "
+ dummyResponse.getText());
consumer2MsgsNo++;
}
In this scenario, if consumer1 is registered and connection1 started before sending the messages, then at the end of the test, consumer1 will receive 5 messages, and consumer2 5 messages.
But in scenario2,
ConnectionFactory connFactory1 = (ConnectionFactory) initialContext0
.lookup("jms/RemoteConnectionFactory");
Queue sourceQueue = (Queue) initialContext0.lookup("jms/queue/SourceQ1");
ConnectionFactory connFactory2 = (ConnectionFactory) initialContext1
.lookup("jms/RemoteConnectionFactory");
Queue targetQueue = (Queue) initialContext1.lookup("jms/queue/TargetQ1");
connection1 = connFactory1.createConnection(APPLICATIONREALM_USER,
APPLICATIONREALM_USER_PWD);
connection2 = connFactory2.createConnection(APPLICATIONREALM_USER,
APPLICATIONREALM_USER_PWD);
Session session1 = (Session) connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = (Session) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
connection2.start();
MessageProducer producer = session1.createProducer(sourceQueue);
MessageConsumer consumer2 = session2.createConsumer(targetQueue);
TextMessage txtMsg = null;
int sentMsgsNo = 10;
for (int i = 0; i < sentMsgsNo; i++) {
txtMsg = session1.createTextMessage("Message walks on bridge...");
producer.send(txtMsg);
}
System.out.println("Producer sent messages.");
System.out.println("Wait for bridge to divert messages to destination queue ...");
MessageConsumer consumer1 = session1.createConsumer(sourceQueue);
TextMessage dummyResponse = null;
int consumer1MsgsNo = 0;
int consumer2MsgsNo = 0;
while ((dummyResponse = (TextMessage) consumer1.receive(4000)) != null) {
System.out.println("SourceQueue consumer received message: "
+ dummyResponse.getText());
consumer1MsgsNo++;
}
while ((dummyResponse = (TextMessage) consumer2.receive(4000)) != null) {
System.out.println("DestinationQueue consumer received message: "
+ dummyResponse.getText());
consumer2MsgsNo++;
}
, where consumer1 is registered after the messages where sent, the test ends with 10 messages on consumer2 and 0 on consumer1.
I cannot find out why this is happening. It seems that messages are load-balanced through the consumers. Is this the normal behaviour ? The bridge is NOT unidirectional, but bidirectional ? (although the bridge is defined only on node1 and its definition says that the forwarding address is only TargetQ1 on node2)
Thanks,
Regards