JOIN not working..
clebert.suconic Mar 11, 2009 7:17 PMUnless I made some mistake on the following testCase, there is some problem on JOIN (which I will be investigating).
But if you guys could please take a look on this testcase... just to make sure I'm not missing anything obvious:
public void testSimpleJoin() throws Exception
{
SimpleString ADDRESS1 = new SimpleString("Address-1");
SimpleString ADDRESS2 = new SimpleString("Address-2");
clientSession.createQueue(ADDRESS1, ADDRESS1, true);
clientSession.createQueue(ADDRESS2, ADDRESS2, true);
Xid xid = newXID();
ClientSession sessionA = sessionFactory.createSession(true, false, false);
sessionA.start(xid, XAResource.TMNOFLAGS);
ClientSession sessionB = sessionFactory.createSession(true, false, false);
sessionB.start(xid, XAResource.TMJOIN);
ClientProducer prodA = sessionA.createProducer(ADDRESS1);
ClientProducer prodB = sessionB.createProducer(ADDRESS2);
prodA.send(createTextMessage(sessionA, "A"));
prodB.send(createTextMessage(sessionB, "B"));
sessionA.end(xid, XAResource.TMSUCCESS);
sessionB.end(xid, XAResource.TMSUCCESS);
// Since sessionA and B are from the same server, we could close B and commit the XID through sessionA, as B was JOINED
sessionB.close();
sessionA.commit(xid, true);
sessionA.close();
xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
ClientConsumer cons1 = clientSession.createConsumer(ADDRESS1);
ClientConsumer cons2 = clientSession.createConsumer(ADDRESS2);
clientSession.start();
ClientMessage msg = cons1.receive(1000);
assertNotNull(msg);
msg.acknowledge();
assertNull(cons1.receiveImmediate());
msg = cons2.receive(1000);
assertNotNull(msg);
msg.acknowledge();
assertNull(cons2.receiveImmediate());
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.commit(xid, true);
clientSession.close();
}
And the same test, but using a real TransactionManager instead:
public void testSimpleJoinWithTM() throws Exception
{
SimpleString ADDRESS1 = new SimpleString("Address-1");
SimpleString ADDRESS2 = new SimpleString("Address-2");
clientSession.createQueue(ADDRESS1, ADDRESS1, true);
clientSession.createQueue(ADDRESS2, ADDRESS2, true);
TransactionManager tm = new TransactionManagerImple();
tm.begin();
Transaction tx = tm.getTransaction();
ClientSession sessionA = sessionFactory.createSession(true, false, false);
tx.enlistResource(sessionA);
ClientSession sessionB = sessionFactory.createSession(true, false, false);
tx.enlistResource(sessionB);
ClientProducer prodA = sessionA.createProducer(ADDRESS1);
ClientProducer prodB = sessionB.createProducer(ADDRESS2);
prodA.send(createTextMessage(sessionA, "A"));
prodB.send(createTextMessage(sessionB, "B"));
tx.delistResource(sessionA, XAResource.TMSUCCESS);
tx.delistResource(sessionB, XAResource.TMSUCCESS);
tm.commit();
sessionB.close();
sessionA.close();
tm.begin();
tx = tm.getTransaction();
tx.enlistResource(clientSession);
ClientConsumer cons1 = clientSession.createConsumer(ADDRESS1);
ClientConsumer cons2 = clientSession.createConsumer(ADDRESS2);
clientSession.start();
ClientMessage msg = cons1.receive(1000);
assertNotNull(msg);
msg.acknowledge();
assertNull(cons1.receiveImmediate());
msg = cons2.receive(1000);
assertNotNull(msg);
msg.acknowledge();
assertNull(cons2.receiveImmediate());
tx.delistResource(clientSession, XAResource.TMSUCCESS);
tm.commit();
clientSession.close();
}