No Consumer Buffering caused random Queue behavior
riteshj2ee Jul 4, 2011 6:29 AMHi All,
Earlier when new Consumer starts, it wasn’t able to pick-up the messages from Queue even though messages are there in Queue because
messages were internally buffered in Consumers which are already running and that’s is the default behavior of HornetQ.
This problem was resolved by introducing No Consumer Buffering in Queue, But after that Queue behavior became unstable.
Eg. After picking up the few messages from Queue, sometimes Consumers are not picking-up the next available messages.
Now to resolve this problem, I am just closing the Session on existing Connection after receiving one message from Queue.
So Connection will be established to Queue only once but every time new Session will be created before receiving a message and
Session will be closed after receiving a message from Queue.
Can anybody tell me is this approach is fine or not?
Earlier without introducing No Consumer Buffering, single Session was working as per expectation, so why its not working fine with
No Consumer Buffering?
CODE: I am using JBoss 6AS HornetQ with JMS on Linux OS
// initialization code
queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup(QUEUE_CONNECTION_FACTORY);
queue = (Queue) jndiContext.lookup(JNDI_QUEUE_NAME);
queueConnection = queueConnectionFactory.createQueueConnection();
queueConnection.start();
// code in loop
queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
queueReceiver = queueSession.createReceiver(queue, receiverMessageSelector);
Message m = queueReceiver.receive();
queueSession.close();
Steps to reproduce this problem:
1. Connection factory with No Consumer Buffering (hornetq-jms.xml):
<connection-factory name="NettyConnectionFactory">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/ConnectionFactory"/>
<entry name="/XAConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
2. code to fetch message from Queue & its initialization
private Job getJobFromJmsQueue() throws JMSException, SQLException {
ObjectMessage message = null;
LOG.info("Waiting For Job.");
LOG.info("Session Start!!");
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
final String receiverMessageSelector = receiverMessageSelector(ClientUtils.getHostName().toLowerCase());
LOG.info("Receiver session String:" + receiverMessageSelector);
queueReceiver = queueSession.createReceiver(queue, receiverMessageSelector);
Message m = queueReceiver.receive();
if (m != null) {
if (m instanceof ObjectMessage) {
LOG.info("Job Received.");
message = (ObjectMessage) m;
} else if (m instanceof TextMessage) {
TextMessage tm = (TextMessage)m;
LOG.info("Message received:" + tm.getText());
} else {
throw new JMSException("Invalid Message in the queue.");
}
}
Job job = (Job)message.getObject();
queueSession.close();
queueSession=null;
LOG.info("Session Closed!!");
return job;
}
private void init() {
try {
queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup(QUEUE_CONNECTION_FACTORY);
queue = (Queue) jndiContext.lookup(JNDI_QUEUE_NAME);
queueConnection = queueConnectionFactory.createQueueConnection();
queueConnection.start();
// message selector
receiverMessageSelector = receiverMessageSelector(ClientUtils.getHostName().toLowerCase());
LOG.info("Receiver session String:" + receiverMessageSelector);
} catch (JMSException e) {
System.exit(1);
} catch (NamingException e) {
System.exit(1);
}
}
3. Produce some (10-20) message and send to the Queue.
4. Start consumer1
5. Start consumer2
6. Confirm that both consumer1 & consumer2 are receiving messages
7. Repeat step 3,4,5 for two-three times it will start behaving randomly, sometimes Consumers will not receive messages even though its there in the Queue. By restarting that consumer again it will receive that message and again stop receiving within few minutes.
Thanks & Regards,
Ritesh
-
complete source code.zip 4.3 KB