6 Replies Latest reply on Nov 4, 2009 6:24 PM by Andy Taylor

    Dynamic selectors not being properly distributed in cluster

    Russell Scheerer Newbie

      I have a simple setup of two auto-discovered nodes in a cluster.

      I register two consumers to the same queue each on a different node using a different selector to consume messages.
      mySelector='A'
      mySelector='B'

      I then use a single producer connected to just one of the nodes to send several messages for each selector.

      The problem is that the messages are not being redistributed to the node that matches the other selector.

      Is the message distribution only supposed to work for static selectors in the queues themselves? I need to keep the selector 'dynamic' this will vary at runtime.

        • 1. Re: Dynamic selectors not being properly distributed in clus
          Tim Fox Master

          Can you describe in some more detail what you're trying to achieve here - i.e. what's on each node, exactly what you're doing with config etc.?

          Server side load-balancing should certainly take into account message selectors (message redistribution is something else)

          • 2. Re: Dynamic selectors not being properly distributed in clus
            Russell Scheerer Newbie

            Well, I expect to receive all messages given a certain selector regardless of which node the producer or consumer is on.

            Here is some sample code that easily recreates the problem. Here is a modified runExample() method from the ClusteredQueue example.

             public boolean runExample() throws Exception {
             Connection connection0 = null;
             Connection connection1 = null;
             InitialContext ic0 = null;
             InitialContext ic1 = null;
            
             try {
             ic0 = getContext(0);
             Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");
             ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
            
             ic1 = getContext(1);
             ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
            
             connection0 = cf0.createConnection();
             connection1 = cf1.createConnection();
            
             Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
             connection0.start();
             connection1.start();
            
             // Each consumer uses a different selector
             MessageConsumer consumer0 = session0.createConsumer(queue, "mySelector='A'");
             MessageConsumer consumer1 = session1.createConsumer(queue, "mySelector='B'");
            
             Thread.sleep(1000);
            
             // We create a JMS MessageProducer object on server 0
             MessageProducer producer = session0.createProducer(queue);
             // We send some messages to server 0
             final int numMessages = 10;
            
             // Send some 'A' messages
             for (int i = 0; i < numMessages; i++) {
             TextMessage message = session0.createTextMessage("This is text message A " + i);
             message.setStringProperty("mySelector", "A");
             producer.send(message);
             System.out.println("Sent message: " + message.getText());
             }
             // Send some 'B' messages
             for (int i = 0; i < numMessages; i++) {
             TextMessage message = session0.createTextMessage("This is text message B " + (i + 10));
             message.setStringProperty("mySelector", "B");
             producer.send(message);
             System.out.println("Sent message: " + message.getText());
             }
            
             // We now consume those messages on *both* server 0 and server 1.
             for (int i = 0; i < numMessages * 2; i += 2) {
             TextMessage message0 = (TextMessage)consumer0.receive(5000);
             if (message0 != null) {
             System.out.println("Got message: " + message0.getText() + " from node 0");
             }
             TextMessage message1 = (TextMessage)consumer1.receive(5000);
             if (message1 != null) {
             System.out.println("Got message: " + message1.getText() + " from node 1");
             }
             }
             return true;
             }
             finally {
             if (connection0 != null) {
             connection0.close();
             }
             if (connection1 != null) {
             connection1.close();
             }
             if (ic0 != null) {
             ic0.close();
             }
             if (ic1 != null) {
             ic1.close();
             }
             }
             }
            


            And here is the output...

             [java] Sent message: This is text message A 0
             [java] Sent message: This is text message A 1
             [java] Sent message: This is text message A 2
             [java] Sent message: This is text message A 3
             [java] Sent message: This is text message A 4
             [java] Sent message: This is text message A 5
             [java] Sent message: This is text message A 6
             [java] Sent message: This is text message A 7
             [java] Sent message: This is text message A 8
             [java] Sent message: This is text message A 9
             [java] Sent message: This is text message B 10
             [java] Sent message: This is text message B 11
             [java] Sent message: This is text message B 12
             [java] Sent message: This is text message B 13
             [java] Sent message: This is text message B 14
             [java] Sent message: This is text message B 15
             [java] Sent message: This is text message B 16
             [java] Sent message: This is text message B 17
             [java] Sent message: This is text message B 18
             [java] Sent message: This is text message B 19
             [java] Got message: This is text message A 0 from node 0
             [java] Got message: This is text message B 11 from node 1
             [java] Got message: This is text message A 2 from node 0
             [java] Got message: This is text message B 13 from node 1
             [java] Got message: This is text message A 4 from node 0
             [java] Got message: This is text message B 15 from node 1
             [java] Got message: This is text message A 6 from node 0
             [java] Got message: This is text message B 17 from node 1
             [java] Got message: This is text message A 8 from node 0
             [java] Got message: This is text message B 19 from node 1
             [java] example complete
            


            You can see how messages are being left on the queues since the selectors didn't happen to match on that node.

            Thanks for your help in advance.

            • 3. Re: Dynamic selectors not being properly distributed in clus
              Tim Fox Master

              What value have you set forward-when-no-consumers to?

              • 4. Re: Dynamic selectors not being properly distributed in clus
                Russell Scheerer Newbie

                DOH!

                It was set to:

                <forward-when-no-consumers>true</forward-when-no-consumers>
                


                Changing it to:
                <forward-when-no-consumers>false</forward-when-no-consumers>
                


                fixed it.

                Thanks for the quick response Tim!

                • 5. Re: Dynamic selectors not being properly distributed in clus
                  Tim Fox Master

                  If forward-when-no-consumers=true then it will "blindly" round robin messages between nodes irrespective of whether there are consumers or whether there are consumers whose selectors match.

                  I'm thinking perhaps forward-when-no-consumers is a bad name for the attribute. Perhaps we should rename it to forward-irrespective-of-consumers (or something similar) ?