Messages not delivered to reply queue
vrushank.desai Apr 30, 2017 12:56 PMIn our current application using JBoss EAP 6.2, we have many batch jobs triggered by remote EJB invocations. In order to centralize all notification logic for these jobs we are deciding to route all calls through an MDB by passing a serialized message. The intended flow is as below:
- Batch job client sends message to a remote queue
- MDB listens on this remote queue, process message and invokes EJB
- DLQ is configured to process notifications when all retries are exhausted
- Notification should also be sent on each retry. To avoid too many notifications, retry interval is sufficiently high
To handle the last point, I tried creating a Reply queue by setting it in the JMSReplyTo header. To simulate above flow, I have created the below client and MDB implementations...
Relevant entries in standalone-full.xml
<security-settings> <security-setting match="#"> <permission type="send" roles="queue guest"/> <permission type="consume" roles="queue guest"/> <permission type="createNonDurableQueue" roles="queue guest"/> <permission type="deleteNonDurableQueue" roles="queue guest"/> </security-setting> </security-settings> <address-settings> <address-setting match="jms.queue.testQueue"> <dead-letter-address>jms.queue.DLQ</dead-letter-address> <expiry-address>jms.queue.ExpiryQueue</expiry-address> <redelivery-delay>1000</redelivery-delay> <max-delivery-attempts>3</max-delivery-attempts> <max-size-bytes>10485760</max-size-bytes> <address-full-policy>BLOCK</address-full-policy> <message-counter-history-day-limit>10</message-counter-history-day-limit> </address-setting> <address-setting match="jms.queue.DLQ"> <redelivery-delay>1000</redelivery-delay> <max-delivery-attempts>3</max-delivery-attempts> <max-size-bytes>10485760</max-size-bytes> <address-full-policy>BLOCK</address-full-policy> <message-counter-history-day-limit>10</message-counter-history-day-limit> </address-setting> </address-settings> <jms-destinations> <jms-queue name="testQueue"> <entry name="queue/test"/> <entry name="java:jboss/exported/jms/queue/test"/> </jms-queue> <jms-queue name="replyQueue"> <entry name="queue/reply"/> <entry name="java:jboss/exported/jms/queue/reply"/> </jms-queue> <jms-queue name="DLQ"> <entry name="queue/dead"/> <entry name="java:jboss/exported/jms/queue/dead"/> </jms-queue> <jms-topic name="testTopic"> <entry name="topic/test"/> <entry name="java:jboss/exported/jms/topic/test"/> </jms-topic> </jms-destinations>
Main MDB:
@MessageDriven(name = "MiddleManMDB", activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/test"), @ActivationConfigProperty(propertyName = "connectorClassName", propertyValue = "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"), @ActivationConfigProperty(propertyName = "connectionParameters", propertyValue = "host=localhost;port=5445"), @ActivationConfigProperty(propertyName = "user", propertyValue = "queueuser"), @ActivationConfigProperty(propertyName = "password", propertyValue = "queuepassword") }) public class MiddleManMDB implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(MiddleManMDB.class); @Resource(name = "java:/JmsXA") private ConnectionFactory connectionFactory; /* * (non-Javadoc) * @see javax.jms.MessageListener#onMessage(javax.jms.Message) */ @Override public void onMessage(Message message) { try { LOGGER.info("Current delivery count - {}", getCurrentDeliveryCount(message)); if (message instanceof TextMessage) { LOGGER.info("Received text message --> {}", ((TextMessage)message).getText()); } throw new JMSException("thrown exception"); } catch (Exception e) { sendToReplyQueue(e.getMessage(), message); LOGGER.info("Throwing exception to simulate retry..."); throw new RuntimeException(e); } } private void sendToReplyQueue(String errorMessage, Message message) { Context context = null; Connection conn = null; LOGGER.info("Sending exception details to reply queue..."); try { context = new InitialContext(); conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination jmsReplyTo = message.getJMSReplyTo(); MessageProducer replyProducer = session.createProducer(jmsReplyTo); replyProducer.send(jmsReplyTo, session.createTextMessage(errorMessage)); } catch (NamingException | JMSException e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } if (context != null) { try { context.close(); } catch (NamingException e) { e.printStackTrace(); } } } } private int getCurrentDeliveryCount(Message message) throws JMSException { return message.getIntProperty("JMSXDeliveryCount"); } }
Reply MDB
@MessageDriven(name = "ReplyMDB", activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/reply") }) public class ReplyMDB implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(ReplyMDB.class); @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { LOGGER.info("Received reply message --> " + ((TextMessage)message).getText()); } } catch (JMSException e) { LOGGER.error("Error in reply queue...", e); } } }
Dead Letter MDB
@MessageDriven(name = "DeadLetterMDB", activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/dead") }) public class DeadLetterMDB implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterMDB.class); @Override public void onMessage(Message message) { try { LOGGER.info("Message has arrived in dead letter queue"); LOGGER.info("Current delivery count - {}", message.getIntProperty("JMSXDeliveryCount")); if (message instanceof TextMessage) { LOGGER.info("Received text message --> {}", ((TextMessage)message).getText()); } } catch (JMSException e) { e.printStackTrace(); } } }
Client
public static void main(String[] args) { Connection connection = null; Context context = null; try { final Properties env = new Properties(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory"); env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, "remote://localhost:4447")); env.put(Context.SECURITY_PRINCIPAL, System.getProperty("username", "queueuser")); env.put(Context.SECURITY_CREDENTIALS, System.getProperty("password", "queuepassword")); context = new InitialContext(env); ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("jms/RemoteConnectionFactory"); connection = connectionFactory.createConnection("queueuser", "queuepassword"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = (Destination) context.lookup("jms/queue/test"); Destination replyDest = (Destination) context.lookup("jms/queue/reply"); MessageProducer producer = session.createProducer(destination); connection.start(); TextMessage message = session.createTextMessage("Hello World"); message.setJMSReplyTo(replyDest); producer.send(message); } catch (NamingException | JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } if (context != null) { try { context.close(); } catch (NamingException e) { e.printStackTrace(); } } } }
Now with the above flow in the MDBs, the message is never received in the reply queue. All three queues are deployed on the same server.
I am guessing the reason is the below line:
sendToReplyQueue(e.getMessage(), message);
LOGGER.info("Throwing exception to simulate retry...");
throw new RuntimeException(e);
Since the send is asynchronous and I am throwing an RTE (to trigger retry), the message is somehow never sent. Is there a way to resolve this problem ?
Message was edited by: Vrushank Desai