-
1. Re: Redelivery problem when using MessageSelectors
theamazingtoby Dec 6, 2010 9:49 AM (in response to theamazingtoby)Here's a simple unit test that performs two tests. Both tests send two messages and attempt to receive two messages. The first sends messages with different custom property "SenderID" values and the second sends messages with the same custom property "SenderID" values. The first succeeeds and the second fails. The JMS queue has a redelivery value greater than 0.
package jmstest;
import static org.junit.Assert.assertTrue;
import java.io.Serializable;
import java.util.Hashtable;
import java.util.Map;
import java.util.Properties;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class JMSQueueTest
{
private static Logger log = Logger.getLogger(JMSQueueTest.class);
private String jmsServer = System.getProperty("JMSSERVER");
private String jmsQueueName = System.getProperty("QUEUE");
@BeforeClass
public static void setUpBeforeClass() throws Exception
{
}
@AfterClass
public static void tearDownAfterClass() throws Exception
{
}
@Before
public void setUp() throws Exception
{
log.info("setUp - cleaning up");
clearQueue();
}
@After
public void tearDown() throws Exception
{
log.info("tearDown - cleaning up");
clearQueue();
}
public void clearQueue() throws Exception
{
boolean areThereMore = true;
while (areThereMore)
{
try
{
ObjectMessage message = null;
message = getNextMessage(jmsServer, jmsQueueName, false, 1, null);
if (message != null && message.getObject() != null)
{
log.debug("There's more");
}
else
{
log.debug("No more.");
areThereMore = false;
}
}
catch (Exception e)
{
log.error(e.getMessage(), e);
}
}
}
@Test
public void testJMSQueueSucceeds() throws Exception
{
ObjectMessage om = null;
try
{
// /////////////////////////////////////////////////////////////////////////
// Send a couple of messages
// /////////////////////////////////////////////////////////////////////////
log.info("--------------------------------------------------------------------------------");
log.info("sending two messages");
this.sendJMSMessage("test1");
this.sendJMSMessage("test2");
log.info("--------------------------------------------------------------------------------");
log.debug("Manually retrieving messages");
om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test1'");
assertTrue(om != null);
om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test2'");
assertTrue(om != null);
}
catch (Exception e)
{
log.error(e.getMessage(), e);
}
finally
{
}
log.debug("Done.");
}
@Test
public void testJMSQueueFails() throws Exception
{
ObjectMessage om = null;
try
{
// /////////////////////////////////////////////////////////////////////////
// Send a couple of messages
// /////////////////////////////////////////////////////////////////////////
log.info("--------------------------------------------------------------------------------");
log.info("sending two messages");
this.sendJMSMessage("test1");
this.sendJMSMessage("test1");
log.info("--------------------------------------------------------------------------------");
log.debug("Manually retrieving messages");
om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test1'");
assertTrue(om != null);
om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test1'");
assertTrue(om != null);
}
catch (Exception e)
{
log.error(e.getMessage(), e);
}
finally
{
}
log.debug("Done.");
}
private void sendJMSMessage(String senderID) throws Exception
{
try
{
log.debug("Sending...");
String message = "This is a test";
Hashtable<String, Object> properties = new Hashtable<String, Object>();
if (senderID != null)
{
properties.put("senderID", senderID);
log.debug("senderID = " + senderID);
}
this.sendMessageX(jmsServer, jmsQueueName, true, (Serializable) message, 20, null, 0, properties, DeliveryMode.NON_PERSISTENT, 4);
log.debug("Message sent");
}
catch (Exception e)
{
log.error(e.getMessage(), e);
}
}
private void sendMessageX(String jmsServer, String jmsQueueName, boolean clientAcknowledge, Serializable message, int ttl, JMSQueue replyTo, int replyWaitSeconds, Map<String, Object> properties, int persistence, int priority) throws NamingException, JMSException
{
InitialContext ctx = null;
QueueConnection conn = null;
QueueConnectionFactory tcf = null;
Queue queue = null;
QueueSession session = null;
MessageProducer producer = null;
try
{
Properties ctxProperties = new Properties();
ctxProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
ctxProperties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
ctxProperties.put(Context.PROVIDER_URL, jmsServer);
ctx = new InitialContext(ctxProperties);
tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
conn = tcf.createQueueConnection();
queue = (Queue) ctx.lookup(jmsQueueName);
conn.start();
if (clientAcknowledge)
{
session = conn.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
}
else
{
session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
}
// ////////////////////////////
log.debug("Start sendMessage()");
producer = session.createProducer(queue);
ObjectMessage objMessage = session.createObjectMessage(message);
if (properties != null)
{
for (String key : properties.keySet())
{
objMessage.setObjectProperty(key, properties.get(key));
}
}
if (replyTo != null)
{
objMessage.setJMSReplyTo(replyTo.getJMSQueue());
objMessage.setStringProperty("replyToServer", replyTo.getJmsServer());
objMessage.setStringProperty("replyToQueue", replyTo.getJmsQueueName());
}
producer.setTimeToLive(ttl * 1000);
log.debug("-------------Sending Message");
producer.send(objMessage, persistence, priority, 0);
producer.close();
objMessage = null;
// //////////////////////////////
}
finally
{
try
{
conn.close();
conn = null;
}
catch (Exception e)
{
log.error(e.getMessage(), e);
}
try
{
session.close();
session = null;
}
catch (Exception e)
{
log.error(e.getMessage(), e);
}
}
}
private ObjectMessage getNextMessage(String jmsServer, String jmsQueueName, boolean clientAcknowledge, int queueWaitSeconds, String selector) throws NamingException, JMSException
{
InitialContext ctx = null;
QueueConnection conn = null;
QueueConnectionFactory tcf = null;
Queue queue = null;
QueueSession session = null;
QueueReceiver consumer = null;
try
{
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
properties.put(Context.PROVIDER_URL, jmsServer);
ctx = new InitialContext(properties);
tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
conn = tcf.createQueueConnection();
queue = (Queue) ctx.lookup(jmsQueueName);
conn.start();
if (clientAcknowledge)
{
log.debug("CLIENT_ACKNOWLEDGE");
session = conn.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
}
else
{
log.debug("AUTO_ACKNOWLEDGE");
session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
}
// ////////////////////////////
log.debug("Start getNextMessage()");
if (selector == null)
{
log.debug("Not using a selector");
consumer = session.createReceiver(queue);
}
else
{
log.debug("Using a selector");
consumer = session.createReceiver(queue, selector);
}
ObjectMessage message = (ObjectMessage) consumer.receive(queueWaitSeconds * 1000);
consumer.close();
if (message != null)
{
log.debug("----------Got a message");
return message;
}
else
{
log.debug("---No message found in " + queueWaitSeconds + " seconds");
return null;
}
// ////////////////////////////////
}
finally
{
try
{
conn.close();
conn = null;
}
catch (Exception e)
{
log.error(e.getMessage(), e);
}
try
{
session.close();
session = null;
}
catch (Exception e)
{
log.error(e.getMessage(), e);
}
}
}
}