Bug in consumer windowing?
bill.burke May 10, 2010 6:08 PMI've pre-created a bunch of consumers via a ClientSessionFactory set with windowing to 0
sessionFactory.setConsumerWindowSize(0);
The goal is to randomly pick a consumer, call receive() on it, to get the next message in the queue. I was hoping that setConsumerWindowSize(0) would allow me to do this. I tested it with the below code.
What's interesting is that if I put a consumer.receiveImmediate() within the consumer creation loop, the assertion at the bottom fails. Does receiveImmediate() put the consumer in a state to be able to receive messages? I was hoping to keep a pool of consumers and use them as needed to receive messages. A pool doesn't/can't work if the server pre-delivers to the consumer before it is picked form the pool
@Test
public void testConsumerWindow() throws Exception
{
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
sf.setConsumerWindowSize(0);//
ClientSession session = sf.createSession(false, false, false);
session.createQueue("testWindow", "testWindow", true);
session.close();
int numConsumers = 5;
ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
for (int i = 0; i < numConsumers; i++)
{
System.out.println("created: " + i);
ClientSession session1 = sf.createSession();
ClientConsumer consumer = session1.createConsumer("testWindow");
consumers.add(consumer);
session1.start();
sessions.add(session1);
// --- this is added later to cause error consumer.receiveImmediate();
}
ClientSession senderSession = sf.createSession();
ClientProducer producer = senderSession.createProducer("testWindow");
ClientMessage sent = senderSession.createMessage(false);
sent.putStringProperty("hello", "world");
producer.send(sent);
senderSession.start();
ClientConsumer consumer = consumers.get(2);
ClientMessage received = consumer.receiveImmediate();
Assert.assertNotNull(received);
}