A Message Grouping question
brgorrie Nov 28, 2009 10:48 PMHi,
First up what am I trying to do?
Configure a queue so that even if it has 4 producers and 4 consumers only the first consumer registered recieves messages.
Why am I trying to do this?
I'm evaluating HornetQ as a replacement for our current queueing technology and one of the things the developers have made a lot of use of is the ability for OpenMQ to be configured on a queue by queue basis to only have one consumer irrespective of how many consumers are registered.
With regards to the message groups and group id's, specifically the auto group id piece. If I turn autogroup id on and I have 2 producers producing messages on a queue it looks like they each get different message ids, this would mean that they can go to multiple consumers possibly?
For reference I've created a test case that covers what I'm trying to achieve, I have included it at the end of this post.
I borrowed the autogroupid testcase to create this one, it fails because more then 1 consumer receives messages. How we did this with the earlier Beta version of hornetq was swap out the RoundRobinDistributor with an ExclusiveDistributor (one class and a config change) as the config change was at the address settings level we could have a mix of round robin and exclusive queue behaviour. In the latest cut of the hornetq code the RoundRobinDistributor has been removed and is now a method in the QueueImpl class (see getHandlerRoundRobin()). This means it can no longer be swapped out through config. Which is why I'm wondering if there is another way to do what we were doing now that the RoundRobinDistributor and the associated configuration has been refactored out?
Why am I looking for a configuration solution?
Currently this is controlled through config only for OpenMQ. There was a configuration approach available for HornetQ that is no longer available so I'm wondering what the alternative is.
Now for the test case that should explain what I'm trying to do from a better point of view:
/*
* Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.hornetq.tests.integration.client;
import java.util.concurrent.CountDownLatch;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.MessageHandler;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.SimpleString;
/**
*
* A test case to determine if it is possible to have 1 active consumer on a queue - irrespective of the number
* of producers.
*
* This class was created by duplicating the AutogroupIdTest class and then changing the test to do what is needed.
*
* @author Brian Gorrie
*/
public class ExclusiveConsumerTest extends ServiceTestBase
{
public final SimpleString addressA = new SimpleString("addressA");
public final SimpleString queueA = new SimpleString("queueA");
public final SimpleString queueB = new SimpleString("queueB");
public final SimpleString queueC = new SimpleString("queueC");
private final SimpleString groupTestQ = new SimpleString("testGroupQueue");
/**
* Attempts to do the exclusive consumer testing.
*
* @throws Exception If an error occurs.
*/
public void testExclusiveConsumer() throws Exception
{
HornetQServer server = createServer(false);
try
{
server.start();
ClientSessionFactory sf = createInVMFactory();
sf.setAutoGroup(true);
ClientSession session = sf.createSession(false, true, true);
// TODO What config needs to be set up here to get the exclusive consumer behaviour working.
session.createQueue(groupTestQ, groupTestQ, null, false);
ClientProducer producer = session.createProducer(groupTestQ);
ClientProducer producer2 = session.createProducer(groupTestQ);
final CountDownLatch latch = new CountDownLatch(200);
MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
MyMessageHandler myMessageHandler3 = new MyMessageHandler(latch);
ClientConsumer consumer = session.createConsumer(groupTestQ);
consumer.setMessageHandler(myMessageHandler);
ClientConsumer consumer2 = session.createConsumer(groupTestQ);
consumer2.setMessageHandler(myMessageHandler2);
ClientConsumer consumer3 = session.createConsumer(groupTestQ);
consumer3.setMessageHandler(myMessageHandler3);
session.start();
final int numMessages = 100;
for (int i = 0; i < numMessages; i++)
{
producer.send(session.createClientMessage(false));
}
for (int i = 0; i < numMessages; i++)
{
producer2.send(session.createClientMessage(false));
}
latch.await();
session.close();
assertEquals("The first consumer did not recieve any messages when it should have.",myMessageHandler.messagesReceived, 100);
assertEquals("The second consumer is not the active consumer so should not recieve any messages.",myMessageHandler2.messagesReceived, 0);
assertEquals("The third consumer is not the active consumer so should not recieve any messages.",myMessageHandler3.messagesReceived, 0);
}
finally
{
if (server.isStarted())
{
server.stop();
}
}
}
private static class MyMessageHandler implements MessageHandler
{
volatile int messagesReceived = 0;
private final CountDownLatch latch;
public MyMessageHandler(CountDownLatch latch)
{
this.latch = latch;
}
public void onMessage(ClientMessage message)
{
messagesReceived++;
try
{
message.acknowledge();
}
catch (HornetQException e)
{
e.printStackTrace();
}
latch.countDown();
}
}
}
Cheers,
Brian.