4 Replies Latest reply on Sep 28, 2009 12:24 PM by Andy Taylor

    issue with creating queues in a cluster

    Andy Taylor Master

      This could be my mis interpretation of how this should work so i am after some clarification.

      The scenario is this:

      A cluster is created with 2 nodes , say node A and B. A queue testQ is created on both nodes. At this point the binding for address testQ on both nodes will have 2 queues, 1 local queue and one remote.

      At this point server A either loses its connection to node B or goes down. At this point the binding for address testQ on node B will have 1 queue, its own local queue.

      Now at this point node A either comes back up and/or reconnects. At this point i would expect the state to be recreated, i.e. the queues get created again, but this doesn't happen.

      The same thing happens if a binding is added and then another server is brought up after the queue is created.

      Ive been struggling with this over the weekend, basically with strict ordering, once a message id is pinned to a remote queue it must always go to the same consumer on that queue. if the node goes down thats fine, i just throw an exception on the send and the client can handle it how they like. If the node comes back up tho i should carry on delivering to that remote queue.

      Ive written a test to demonstrate a couple of scenarios.

      public class QueueCreationTest extends ClusterTestBase
      {
       public void testGroupingSimple() throws Exception
       {
       setupServer(0, isFileStorage(), isNetty());
       setupServer(1, isFileStorage(), isNetty());
      
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
      
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
      
      
       startServers(0, 1);
      
       try
       {
      
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
      
       createQueue(0, "queues.testaddress", "queue0", null, true);
       createQueue(1, "queues.testaddress", "queue0", null, true);
      
       waitForBindings(0, "queues.testaddress", 1, 0, false);
       waitForBindings(1, "queues.testaddress", 1, 0, false);
       waitForBindings(0, "queues.testaddress", 1, 0, true);
       waitForBindings(1, "queues.testaddress", 1, 0, true);
      
       assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
       assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
      
       stopClusterConnections(0, 1);
      
       assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
       assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
      
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
      
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
      
       assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
       assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
      
       System.out.println("*****************************************************************************");
       }
       finally
       {
       closeAllConsumers();
      
       closeAllSessionFactories();
      
       stopServers(0, 1);
       }
       }
      
       public void testGroupingSimplecreateclusterafterqcreated() throws Exception
       {
       setupServer(0, isFileStorage(), isNetty());
      
       startServers(0);
      
       try
       {
      
       setupSessionFactory(0, isNetty());
      
       createQueue(0, "queues.testaddress", "queue0", null, true);
      
       waitForBindings(0, "queues.testaddress", 1, 0, true);
      
       assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
      
      
       setupServer(1, isFileStorage(), isNetty());
      
       startServers(1);
      
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
      
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
      
       setupSessionFactory(1, isNetty());
      
       createQueue(1, "queues.testaddress", "queue0", null, true);
      
       waitForBindings(1, "queues.testaddress", 1, 0, true);
       waitForBindings(0, "queues.testaddress", 1, 0, false);
       waitForBindings(1, "queues.testaddress", 1, 0, false);
      
       assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
       assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
      
       System.out.println("*****************************************************************************");
       }
       finally
       {
       closeAllConsumers();
      
       closeAllSessionFactories();
      
       stopServers(0, 1);
       }
       }
      
       public void testGroupingServerRestarted() throws Exception
       {
       setupServer(0, isFileStorage(), isNetty());
       setupServer(1, isFileStorage(), isNetty());
      
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
      
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
      
      
       startServers(0, 1);
      
       try
       {
      
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
      
       createQueue(0, "queues.testaddress", "queue0", null, true);
       createQueue(1, "queues.testaddress", "queue0", null, true);
      
       waitForBindings(0, "queues.testaddress", 1, 0, false);
       waitForBindings(1, "queues.testaddress", 1, 0, false);
       waitForBindings(0, "queues.testaddress", 1, 0, true);
       waitForBindings(1, "queues.testaddress", 1, 0, true);
      
       assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
       assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
      
       stopClusterConnections(0, 1);
      
       stopServers(0);
       clearServer(0);
       setupServer(0, isFileStorage(), isNetty());
       startServers(0);
      
      
       assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
       assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 1);
      
       setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
      
       setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
       waitForBindings(0, "queues.testaddress", 1, 0, true);
       waitForBindings(0, "queues.testaddress", 1, 0, false);
       waitForBindings(1, "queues.testaddress", 1, 0, true);
       waitForBindings(1, "queues.testaddress", 1, 0, false);
      
       assertEquals(getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
       assertEquals(getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings().size(), 2);
      
       System.out.println("*****************************************************************************");
       }
       finally
       {
       closeAllConsumers();
      
       closeAllSessionFactories();
      
       stopServers(0, 1);
       }
       }
      
       public boolean isNetty()
       {
       return true;
       }
      
       public boolean isFileStorage()
       {
       return true;
       }
      }
      


        • 1. Re: issue with creating queues in a cluster
          Tim Fox Master

           

          "ataylor" wrote:
          This could be my mis interpretation of how this should work so i am after some clarification.

          The scenario is this:

          A cluster is created with 2 nodes , say node A and B. A queue testQ is created on both nodes. At this point the binding for address testQ on both nodes will have 2 queues, 1 local queue and one remote.

          At this point server A either loses its connection to node B or goes down. At this point the binding for address testQ on node B will have 1 queue, its own local queue.


          No, the queue should not get deleted if node A goes down - that would mean losing messages.


          Now at this point node A either comes back up and/or reconnects. At this point i would expect the state to be recreated, i.e. the queues get created again, but this doesn't happen.


          There should be no need to recreate the queue since it should already exist - perhaps you're confusing queues and bindings?


          • 2. Re: issue with creating queues in a cluster
            Andy Taylor Master

             

            No, the queue should not get deleted if node A goes down - that would mean losing messages.


            actually, I think the queue isn't deleted, only the binding removed. what happens to the queue, are there other references to it once the binding itself has been removed.

            There should be no need to recreate the queue since it should already exist - perhaps you're confusing queues and bindings?


            Yes, thats what i mean, the queue is removed from the binding, but never re added so nothing can be routed to it.

            • 3. Re: issue with creating queues in a cluster
              Tim Fox Master

              I'm finding this a bit hard to understand as you seem to be using queue and binding interchangeably but they're different things, but I'll try.

              If you have nodes A and B, there should be a queue on both node A and node B which is the "store and forward" queue for the other node. Any messages destined for the other node are stored in there before being forwarded to the other node.

              If, say, node A is taken down. Then the store and forward queue on node B for node A should remain (since it might contain messages), however the remoting *binding* for node A on node B should be removed in memory.

              When node A is brought back up the remote queue binding should be recreated in memory and should point at the same store and forward queue.

              • 4. Re: issue with creating queues in a cluster
                Andy Taylor Master

                Ok, its the last bit that i don't see happening. the binding isnt recreated and pointed at the queue.

                Can you take a look at the test i posted and see if it makes sense?