7 Replies Latest reply on Nov 29, 2009 8:12 AM by Brian Gorrie

    A Message Grouping question

    Brian Gorrie Newbie

      Hi,

      First up what am I trying to do?
      Configure a queue so that even if it has 4 producers and 4 consumers only the first consumer registered recieves messages.

      Why am I trying to do this?
      I'm evaluating HornetQ as a replacement for our current queueing technology and one of the things the developers have made a lot of use of is the ability for OpenMQ to be configured on a queue by queue basis to only have one consumer irrespective of how many consumers are registered.

      With regards to the message groups and group id's, specifically the auto group id piece. If I turn autogroup id on and I have 2 producers producing messages on a queue it looks like they each get different message ids, this would mean that they can go to multiple consumers possibly?

      For reference I've created a test case that covers what I'm trying to achieve, I have included it at the end of this post.

      I borrowed the autogroupid testcase to create this one, it fails because more then 1 consumer receives messages. How we did this with the earlier Beta version of hornetq was swap out the RoundRobinDistributor with an ExclusiveDistributor (one class and a config change) as the config change was at the address settings level we could have a mix of round robin and exclusive queue behaviour. In the latest cut of the hornetq code the RoundRobinDistributor has been removed and is now a method in the QueueImpl class (see getHandlerRoundRobin()). This means it can no longer be swapped out through config. Which is why I'm wondering if there is another way to do what we were doing now that the RoundRobinDistributor and the associated configuration has been refactored out?

      Why am I looking for a configuration solution?
      Currently this is controlled through config only for OpenMQ. There was a configuration approach available for HornetQ that is no longer available so I'm wondering what the alternative is.

      Now for the test case that should explain what I'm trying to do from a better point of view:

      /*
      * Copyright 2009 Red Hat, Inc.
      * Red Hat licenses this file to you under the Apache License, version
      * 2.0 (the "License"); you may not use this file except in compliance
      * with the License. You may obtain a copy of the License at
      * http://www.apache.org/licenses/LICENSE-2.0
      * Unless required by applicable law or agreed to in writing, software
      * distributed under the License is distributed on an "AS IS" BASIS,
      * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
      * implied. See the License for the specific language governing
      * permissions and limitations under the License.
      */
      package org.hornetq.tests.integration.client;

      import java.util.concurrent.CountDownLatch;

      import org.hornetq.core.client.ClientConsumer;
      import org.hornetq.core.client.ClientMessage;
      import org.hornetq.core.client.ClientProducer;
      import org.hornetq.core.client.ClientSession;
      import org.hornetq.core.client.ClientSessionFactory;
      import org.hornetq.core.client.MessageHandler;
      import org.hornetq.core.exception.HornetQException;
      import org.hornetq.core.server.HornetQServer;
      import org.hornetq.tests.util.ServiceTestBase;
      import org.hornetq.utils.SimpleString;

      /**
      *
      * A test case to determine if it is possible to have 1 active consumer on a queue - irrespective of the number
      * of producers.
      *
      * This class was created by duplicating the AutogroupIdTest class and then changing the test to do what is needed.
      *
      * @author Brian Gorrie
      */
      public class ExclusiveConsumerTest extends ServiceTestBase
      {
      public final SimpleString addressA = new SimpleString("addressA");

      public final SimpleString queueA = new SimpleString("queueA");

      public final SimpleString queueB = new SimpleString("queueB");

      public final SimpleString queueC = new SimpleString("queueC");

      private final SimpleString groupTestQ = new SimpleString("testGroupQueue");


      /**
      * Attempts to do the exclusive consumer testing.
      *
      * @throws Exception If an error occurs.
      */
      public void testExclusiveConsumer() throws Exception
      {
      HornetQServer server = createServer(false);
      try
      {
      server.start();

      ClientSessionFactory sf = createInVMFactory();
      sf.setAutoGroup(true);
      ClientSession session = sf.createSession(false, true, true);

      // TODO What config needs to be set up here to get the exclusive consumer behaviour working.

      session.createQueue(groupTestQ, groupTestQ, null, false);

      ClientProducer producer = session.createProducer(groupTestQ);
      ClientProducer producer2 = session.createProducer(groupTestQ);

      final CountDownLatch latch = new CountDownLatch(200);

      MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
      MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
      MyMessageHandler myMessageHandler3 = new MyMessageHandler(latch);

      ClientConsumer consumer = session.createConsumer(groupTestQ);
      consumer.setMessageHandler(myMessageHandler);
      ClientConsumer consumer2 = session.createConsumer(groupTestQ);
      consumer2.setMessageHandler(myMessageHandler2);
      ClientConsumer consumer3 = session.createConsumer(groupTestQ);
      consumer3.setMessageHandler(myMessageHandler3);

      session.start();

      final int numMessages = 100;

      for (int i = 0; i < numMessages; i++)
      {
      producer.send(session.createClientMessage(false));
      }
      for (int i = 0; i < numMessages; i++)
      {
      producer2.send(session.createClientMessage(false));
      }
      latch.await();

      session.close();

      assertEquals("The first consumer did not recieve any messages when it should have.",myMessageHandler.messagesReceived, 100);
      assertEquals("The second consumer is not the active consumer so should not recieve any messages.",myMessageHandler2.messagesReceived, 0);
      assertEquals("The third consumer is not the active consumer so should not recieve any messages.",myMessageHandler3.messagesReceived, 0);
      }
      finally
      {
      if (server.isStarted())
      {
      server.stop();
      }
      }

      }

      private static class MyMessageHandler implements MessageHandler
      {
      volatile int messagesReceived = 0;

      private final CountDownLatch latch;

      public MyMessageHandler(CountDownLatch latch)
      {
      this.latch = latch;
      }

      public void onMessage(ClientMessage message)
      {
      messagesReceived++;
      try
      {
      message.acknowledge();
      }
      catch (HornetQException e)
      {
      e.printStackTrace();
      }
      latch.countDown();
      }
      }
      }

      Cheers,

      Brian.

        • 1. Re: A Message Grouping question
          Tim Fox Master

           

          "brgorrie" wrote:
          Hi,

          First up what am I trying to do?
          Configure a queue so that even if it has 4 producers and 4 consumers only the first consumer registered recieves messages.


          You can use the message grouping feature to do this.

          http://hornetq.sourceforge.net/docs/hornetq-2.0.0.BETA5/user-manual/en/html/message-grouping.html#message-grouping.jmsconfigure

          If you're using JMS, set JMSXGroupID to a unique value.

          • 2. Re: A Message Grouping question
            Brian Gorrie Newbie

            Hi,

            Message Grouping requires a code change rather then a config change to get all messages to go to one consumer. Also, the test case shows that autogroupid won't do it when there are multiple producers.

            For some background: I am evaluating dropping HornetQ in in place of OpenMQ in an existing software system (runs nearly 20 different jboss instances, with 1 to 2 ears per jboss instance, and a very heavy use of MQ's). Its a wholesale online trading platform. There is a lot of legacy code to change if the code has to also send through a group id.

            Currently SunOne MQ, Open MQ, Active MQ, and Tibco EMS all allow you to do it via config changes. SunOneMQ has it as the default (and only) behaviour unless you pay for the Enterprise license.

            Up until revision 8282 (8283 refactored out the distributor config) I could drop HornetQ in in place of OpenMQ with minimal code changes as I only needed to play with the HornetQ configuration and roll out a custom distribution policy.

            From reading the Message Group section of the manual each producer will need to be configured to use the same group id for each queue in order to get the exclusive consumer behaviour. From looking at the test cases this is a property set on the messages being sent. To re-jig the existing software to do this would require a refactor of the framework it uses to hide the JMS functionality - this isn't a small task.

            Is there a way to configure a queue in HornetQ so that all producers will use the same group id when sending messages that avoids the need to refactor existing production code?

            It is most probably lack of knowledge on my part regarding the HornetQ configuration.

            Why was the functionality refactored out in commit 8283?
            I'm thinking either performance or stability reasons, but just want to double check.

            Please let me know if you need any more information?

            Thank you for your help and your patience,

            Brian.

            • 3. Re: A Message Grouping question
              Tim Fox Master

               

              "brgorrie" wrote:
              Hi,

              Message Grouping requires a code change rather then a config change to get all messages to go to one consumer.


              Yes, that's the standard JMS way of doing it - setting JMSXGroupID programmatically on your messages


              Also, the test case shows that autogroupid won't do it when there are multiple producers.


              I'm not referrring to autogroupid - that's different. Auto group id sets a random value for JMSXGroupid which is used for all messages produced by the producer - this is useful when you want all messages produced from a particular producer to go the same consumer. In your case it's not what you want though.

              If you want the ability to configure a constant group id on a connection factory via config, that would be a fairly easy change.

              You could add a JIRA to do that.




              • 4. Re: A Message Grouping question
                Brian Gorrie Newbie

                Hi,

                If configured at the connection factory level, would that be the behaviour for all Queues on that instance of HornetQ?

                If it is that isn't quite what I want, I'm after the ability to turn it on or off at the queue level. Which is what the address settings config allowed you to do back in revision 8282.

                The configuration would need to either be where you define the queues or in the address setting configuration.

                Thank you for your help,

                Brian.

                • 5. Re: A Message Grouping question
                  Tim Fox Master

                   

                  "brgorrie" wrote:
                  Hi,

                  If configured at the connection factory level, would that be the behaviour for all Queues on that instance of HornetQ?

                  If it is that isn't quite what I want, I'm after the ability to turn it on or off at the queue level. Which is what the address settings config allowed you to do back in revision 8282.

                  The configuration would need to either be where you define the queues or in the address setting configuration.

                  Thank you for your help,

                  Brian.


                  Adding it at the connection factory would mean that value of group id would be used for any messages created using that connection factory.

                  This is config, just like AddressSettings and wouldn't require any code changes.

                  • 6. Re: A Message Grouping question
                    Brian Gorrie Newbie

                    Cool, so then depending on which behaviour the message producer needed it would be configured to use the appropriate connection factory.

                    Which is client side config, but I already have to make client side config changes anyway to switch to HornetQ.

                    Thank you for your help, I will throw this into a Jira.

                    Cheers,

                    Brian.

                    • 7. Re: A Message Grouping question
                      Brian Gorrie Newbie

                      Hi,

                      Have created a feature request for this,

                      https://jira.jboss.org/jira/browse/HORNETQ-229

                      cheers,

                      Brian.