issue with creating queues in a cluster
ataylor Sep 28, 2009 6:17 AMThis could be my mis interpretation of how this should work so i am after some clarification.
The scenario is this:
A cluster is created with 2 nodes , say node A and B. A queue testQ is created on both nodes. At this point the binding for address testQ on both nodes will have 2 queues, 1 local queue and one remote.
At this point server A either loses its connection to node B or goes down. At this point the binding for address testQ on node B will have 1 queue, its own local queue.
Now at this point node A either comes back up and/or reconnects. At this point i would expect the state to be recreated, i.e. the queues get created again, but this doesn't happen.
The same thing happens if a binding is added and then another server is brought up after the queue is created.
Ive been struggling with this over the weekend, basically with strict ordering, once a message id is pinned to a remote queue it must always go to the same consumer on that queue. if the node goes down thats fine, i just throw an exception on the send and the client can handle it how they like. If the node comes back up tho i should carry on delivering to that remote queue.
Ive written a test to demonstrate a couple of scenarios.
public class QueueCreationTest extends ClusterTestBase { public void testGroupingSimple() throws Exception { setupServer(0, isFileStorage(), isNetty()); setupServer(1, isFileStorage(), isNetty()); setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1); setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0); startServers(0, 1); try { setupSessionFactory(0, isNetty()); setupSessionFactory(1, isNetty()); createQueue(0, "queues.testaddress", "queue0", null, true); createQueue(1, "queues.testaddress", "queue0", null, true); waitForBindings(0, "queues.testaddress", 1, 0, false); waitForBindings(1, "queues.testaddress", 1, 0, false); waitForBindings(0, "queues.testaddress", 1, 0, true); waitForBindings(1, "queues.testaddress", 1, 0, true); assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); stopClusterConnections(0, 1); assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1); assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1); setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1); setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0); assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); System.out.println("*****************************************************************************"); } finally { closeAllConsumers(); closeAllSessionFactories(); stopServers(0, 1); } } public void testGroupingSimplecreateclusterafterqcreated() throws Exception { setupServer(0, isFileStorage(), isNetty()); startServers(0); try { setupSessionFactory(0, isNetty()); createQueue(0, "queues.testaddress", "queue0", null, true); waitForBindings(0, "queues.testaddress", 1, 0, true); assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1); setupServer(1, isFileStorage(), isNetty()); startServers(1); setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1); setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0); setupSessionFactory(1, isNetty()); createQueue(1, "queues.testaddress", "queue0", null, true); waitForBindings(1, "queues.testaddress", 1, 0, true); waitForBindings(0, "queues.testaddress", 1, 0, false); waitForBindings(1, "queues.testaddress", 1, 0, false); assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); System.out.println("*****************************************************************************"); } finally { closeAllConsumers(); closeAllSessionFactories(); stopServers(0, 1); } } public void testGroupingServerRestarted() throws Exception { setupServer(0, isFileStorage(), isNetty()); setupServer(1, isFileStorage(), isNetty()); setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1); setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0); startServers(0, 1); try { setupSessionFactory(0, isNetty()); setupSessionFactory(1, isNetty()); createQueue(0, "queues.testaddress", "queue0", null, true); createQueue(1, "queues.testaddress", "queue0", null, true); waitForBindings(0, "queues.testaddress", 1, 0, false); waitForBindings(1, "queues.testaddress", 1, 0, false); waitForBindings(0, "queues.testaddress", 1, 0, true); waitForBindings(1, "queues.testaddress", 1, 0, true); assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); stopClusterConnections(0, 1); stopServers(0); clearServer(0); setupServer(0, isFileStorage(), isNetty()); startServers(0); assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1); assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1); setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1); setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0); waitForBindings(0, "queues.testaddress", 1, 0, true); waitForBindings(0, "queues.testaddress", 1, 0, false); waitForBindings(1, "queues.testaddress", 1, 0, true); waitForBindings(1, "queues.testaddress", 1, 0, false); assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2); System.out.println("*****************************************************************************"); } finally { closeAllConsumers(); closeAllSessionFactories(); stopServers(0, 1); } } public boolean isNetty() { return true; } public boolean isFileStorage() { return true; } }