issue with creating queues in a cluster
ataylor Sep 28, 2009 6:17 AMThis could be my mis interpretation of how this should work so i am after some clarification.
The scenario is this:
A cluster is created with 2 nodes , say node A and B. A queue testQ is created on both nodes. At this point the binding for address testQ on both nodes will have 2 queues, 1 local queue and one remote.
At this point server A either loses its connection to node B or goes down. At this point the binding for address testQ on node B will have 1 queue, its own local queue.
Now at this point node A either comes back up and/or reconnects. At this point i would expect the state to be recreated, i.e. the queues get created again, but this doesn't happen.
The same thing happens if a binding is added and then another server is brought up after the queue is created.
Ive been struggling with this over the weekend, basically with strict ordering, once a message id is pinned to a remote queue it must always go to the same consumer on that queue. if the node goes down thats fine, i just throw an exception on the send and the client can handle it how they like. If the node comes back up tho i should carry on delivering to that remote queue.
Ive written a test to demonstrate a couple of scenarios.
public class QueueCreationTest extends ClusterTestBase
{
public void testGroupingSimple() throws Exception
{
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
startServers(0, 1);
try
{
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
createQueue(1, "queues.testaddress", "queue0", null, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, false);
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
stopClusterConnections(0, 1);
assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
System.out.println("*****************************************************************************");
}
finally
{
closeAllConsumers();
closeAllSessionFactories();
stopServers(0, 1);
}
}
public void testGroupingSimplecreateclusterafterqcreated() throws Exception
{
setupServer(0, isFileStorage(), isNetty());
startServers(0);
try
{
setupSessionFactory(0, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
waitForBindings(0, "queues.testaddress", 1, 0, true);
assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
setupServer(1, isFileStorage(), isNetty());
startServers(1);
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
setupSessionFactory(1, isNetty());
createQueue(1, "queues.testaddress", "queue0", null, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, false);
assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
System.out.println("*****************************************************************************");
}
finally
{
closeAllConsumers();
closeAllSessionFactories();
stopServers(0, 1);
}
}
public void testGroupingServerRestarted() throws Exception
{
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
startServers(0, 1);
try
{
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
createQueue(1, "queues.testaddress", "queue0", null, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, false);
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
stopClusterConnections(0, 1);
stopServers(0);
clearServer(0);
setupServer(0, isFileStorage(), isNetty());
startServers(0);
assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, false);
assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
System.out.println("*****************************************************************************");
}
finally
{
closeAllConsumers();
closeAllSessionFactories();
stopServers(0, 1);
}
}
public boolean isNetty()
{
return true;
}
public boolean isFileStorage()
{
return true;
}
}