4 Replies Latest reply on May 5, 2017 2:01 AM by vrushank.desai

    Messages not delivered to reply queue

    vrushank.desai

      In our current application using JBoss EAP 6.2, we have many batch jobs triggered by remote EJB invocations. In order to centralize all notification logic for these jobs we are deciding to route all calls through an MDB by passing a serialized message. The intended flow is as below:

       

      • Batch job client sends message to a remote queue
      • MDB listens on this remote queue, process message and invokes EJB
      • DLQ is configured to process notifications when all retries are exhausted
      • Notification should also be sent on each retry. To avoid too many notifications, retry interval is sufficiently high

       

      To handle the last point, I tried creating a Reply queue by setting it in the JMSReplyTo header. To simulate above flow, I have created the below client and MDB implementations...

       

      Relevant entries in standalone-full.xml

       

      <security-settings>
        <security-setting match="#">
            <permission type="send" roles="queue guest"/>
            <permission type="consume" roles="queue guest"/>
            <permission type="createNonDurableQueue" roles="queue guest"/>
            <permission type="deleteNonDurableQueue" roles="queue guest"/>
        </security-setting>
      </security-settings>
      
      
      <address-settings>
        <address-setting match="jms.queue.testQueue">
            <dead-letter-address>jms.queue.DLQ</dead-letter-address>
            <expiry-address>jms.queue.ExpiryQueue</expiry-address>
            <redelivery-delay>1000</redelivery-delay>
            <max-delivery-attempts>3</max-delivery-attempts>
            <max-size-bytes>10485760</max-size-bytes>
            <address-full-policy>BLOCK</address-full-policy>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
        </address-setting>
        <address-setting match="jms.queue.DLQ">
            <redelivery-delay>1000</redelivery-delay>
            <max-delivery-attempts>3</max-delivery-attempts>
            <max-size-bytes>10485760</max-size-bytes>
            <address-full-policy>BLOCK</address-full-policy>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
        </address-setting>
      </address-settings>
      
      <jms-destinations>
        <jms-queue name="testQueue">
            <entry name="queue/test"/>
            <entry name="java:jboss/exported/jms/queue/test"/>
        </jms-queue>
        <jms-queue name="replyQueue">
            <entry name="queue/reply"/>
            <entry name="java:jboss/exported/jms/queue/reply"/>
        </jms-queue>
        <jms-queue name="DLQ">
            <entry name="queue/dead"/>
            <entry name="java:jboss/exported/jms/queue/dead"/>
        </jms-queue>
        <jms-topic name="testTopic">
            <entry name="topic/test"/>
            <entry name="java:jboss/exported/jms/topic/test"/>
        </jms-topic>
      </jms-destinations>
      

       

       

      Main MDB:

      @MessageDriven(name = "MiddleManMDB", activationConfig = {
            @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
            @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/test"),
            @ActivationConfigProperty(propertyName = "connectorClassName", propertyValue = "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"),
            @ActivationConfigProperty(propertyName = "connectionParameters", propertyValue = "host=localhost;port=5445"),
            @ActivationConfigProperty(propertyName = "user", propertyValue = "queueuser"),
            @ActivationConfigProperty(propertyName = "password", propertyValue = "queuepassword")
      })
      public class MiddleManMDB implements MessageListener {
         
         private static final Logger LOGGER = LoggerFactory.getLogger(MiddleManMDB.class);
      
      
         @Resource(name = "java:/JmsXA")
         private ConnectionFactory connectionFactory;
         
         /*
          * (non-Javadoc)
          * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
          */
         @Override
         public void onMessage(Message message) {
            
            try {
               
               LOGGER.info("Current delivery count - {}", getCurrentDeliveryCount(message));
               
               if (message instanceof TextMessage) {
                  LOGGER.info("Received text message --> {}", ((TextMessage)message).getText());
               }
               
               throw new JMSException("thrown exception");
            }
            catch (Exception e) {
               sendToReplyQueue(e.getMessage(), message);
               
               LOGGER.info("Throwing exception to simulate retry...");
               throw new RuntimeException(e);
            }
            
         }
      
      
         private void sendToReplyQueue(String errorMessage, Message message) {
            
            Context context = null;
            Connection conn = null;
            
            LOGGER.info("Sending exception details to reply queue...");
            
            try {
               context = new InitialContext();
               
               conn = connectionFactory.createConnection();
               Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
               Destination jmsReplyTo = message.getJMSReplyTo();
               MessageProducer replyProducer = session.createProducer(jmsReplyTo);
               replyProducer.send(jmsReplyTo, session.createTextMessage(errorMessage));
               
            }
            catch (NamingException | JMSException e) {
               e.printStackTrace();
            }
            finally {
               if (conn != null) {
                  try {
                     conn.close();
                  }
                  catch (JMSException e) {
                     e.printStackTrace();
                  }
               }
               
               if (context != null) {
                  try {
                     context.close();
                  }
                  catch (NamingException e) {
                     e.printStackTrace();
                  }
               }
            }
         }
      
      
         private int getCurrentDeliveryCount(Message message) throws JMSException {
            return message.getIntProperty("JMSXDeliveryCount");
         }
      }
      

       

      Reply MDB

      @MessageDriven(name = "ReplyMDB", activationConfig = {
            @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
            @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/reply")
      })
      public class ReplyMDB implements MessageListener {
      
      
         private static final Logger LOGGER = LoggerFactory.getLogger(ReplyMDB.class);
         
         @Override
         public void onMessage(Message message) {
            try {
               if (message instanceof TextMessage) {
                  LOGGER.info("Received reply message --> " + ((TextMessage)message).getText());
               }
            }
            catch (JMSException e) {
               LOGGER.error("Error in reply queue...", e);
            }
         }
      
      
      }
      

       

      Dead Letter MDB

      @MessageDriven(name = "DeadLetterMDB", activationConfig = {
            @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
            @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/dead")
      })
      public class DeadLetterMDB implements MessageListener {
      
      
         private static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterMDB.class);
         
         @Override
         public void onMessage(Message message) {
            try {
               LOGGER.info("Message has arrived in dead letter queue");
               LOGGER.info("Current delivery count - {}", message.getIntProperty("JMSXDeliveryCount"));
               
               if (message instanceof TextMessage) {
                  LOGGER.info("Received text message --> {}", ((TextMessage)message).getText());
               }
            }
            catch (JMSException e) {
               e.printStackTrace();
            }
         }
      }
      

       

      Client

      public static void main(String[] args) {
         Connection connection = null;
         Context context = null;
         
         try {
            final Properties env = new Properties();
            env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
            env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, "remote://localhost:4447"));
            env.put(Context.SECURITY_PRINCIPAL, System.getProperty("username", "queueuser"));
            env.put(Context.SECURITY_CREDENTIALS, System.getProperty("password", "queuepassword"));
            context = new InitialContext(env);
            
            ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("jms/RemoteConnectionFactory");
            
            connection = connectionFactory.createConnection("queueuser", "queuepassword");
            
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            Destination destination = (Destination) context.lookup("jms/queue/test");
            
            Destination replyDest = (Destination) context.lookup("jms/queue/reply");
            
            MessageProducer producer = session.createProducer(destination);
            
            connection.start();
            
            TextMessage message = session.createTextMessage("Hello World");
            message.setJMSReplyTo(replyDest);
            
            producer.send(message);
         }
         catch (NamingException | JMSException e) {
            e.printStackTrace();
         }
         finally {
            if (connection != null) {
               try {
                  connection.close();
               }
               catch (JMSException e) {
                  e.printStackTrace();
               }
            }
            if (context != null) {
               try {
                  context.close();
               }
               catch (NamingException e) {
                  e.printStackTrace();
               }
            }
         }
      }
      

       

      Now with the above flow in the MDBs, the message is never received in the reply queue. All three queues are deployed on the same server.

       

      I am guessing the reason is the below line:

       

      sendToReplyQueue(e.getMessage(), message);

      LOGGER.info("Throwing exception to simulate retry...");

      throw new RuntimeException(e);

       

      Since the send is asynchronous and I am throwing an RTE (to trigger retry), the message is somehow never sent. Is there a way to resolve this problem ?

       

      Message was edited by: Vrushank Desai

        • 1. Re: Messages not delivered to reply queue
          jbertram

          Couple of things:

          • It's generally bad practice to throw a RuntimeException from onMessage.  See more details on why here.
          • I believe that when a RuntimeException is thrown from onMessage the container will automatically rollback the MDB's transaction which will cancel the message sent to the reply queue since you're using an XA connection factory (i.e. JmsXA). If you want to throw the RuntimeException and still send the message then you should use a connection factory whose connections/sessions will not automatically be enlisted into the transaction. However, I would recommend that you not throw the RuntimeException (see the previous bullet point).
          • If the HornetQ instance is in the same JVM as your MDB then you shouldn't specify the "connectorClassName" and "connectionParameters" activation configuration parameters on the MDB as the values you've set will force the MDB to go through the Netty-based remoting transport rather than the in-vm transport which. The Netty transport will always be slower than the in-vm transport..
          • 2. Re: Messages not delivered to reply queue
            vrushank.desai

            Hello Justin. Thanks for your reply and your helpful suggestions. I have a questions regarding your points as mentioned below:

             

            It's generally bad practice to throw a RuntimeException from onMessage

            Throwing the RTE was just to simulate a retry in my example. However, like I mentioned in the intended design, the batch jobs will now send parameters to the main MDB to call the EJB since MDB has the provision for retries. In the event, that an exception does occur while calling or from the EJB, a retry needs to be made before all attempts are exhausted and an email is sent regarding the same (which will be done in the DLQ and the reply queue in case of all but last retry). Granted, that this will not be a frequent scenario in production but still it needs to be handled. In such cases, if it is not a good practice to throw any kind of RTE, what should be done to trigger a retry? Is MDB not the preferred choice then?

             

            The link also mentions the following approach :

            // Force a transaction rollback and redelivery
            messageDrivenBeanContext.setRollbackOnly();
            

             

            I am not familiar with this approach. Could you please help on how to get hold of the MDB context through the existing code I am already using?

             

            I believe that when a RuntimeException is thrown from onMessage the container will automatically rollback the MDB's transaction which will cancel the message sent to the reply queue since you're using an XA connection factory (i.e. JmsXA). If you want to throw the RuntimeException and still send the message then you should use a connection factory whose connections/sessions will not automatically be enlisted into the transaction.

            In this case, should I change the connection factory to the non-XA one - java:/ConnectionFactory ? Or should I drop the resource injection and lookup the factory manually through JNDI?

             

            If the HornetQ instance is in the same JVM as your MDB then you shouldn't specify the "connectorClassName" and "connectionParameters" activation configuration parameters on the MDB as the values you've set will force the MDB to go through the Netty-based remoting transport rather than the in-vm transport which. The Netty transport will always be slower than the in-vm transport..

            Thanks. Wasn't aware of that.

            • 3. Re: Messages not delivered to reply queue
              jbertram

              Throwing the RTE was just to simulate a retry in my example.

              I understand, but even in that case it's not a good idea. Your MDBs should be written so that they actually catch RuntimeException (or perhaps just Throwable) and do something appropriate (e.g. rollback the transaction, log an error, etc.).

               

              ...if it is not a good practice to throw any kind of RTE, what should be done to trigger a retry?

              You should invoke setRollbackOnly on your MessageDrivenContext.

               

              I am not familiar with this approach. Could you please help on how to get hold of the MDB context through the existing code I am already using?

              I recommend you Google for "inject messagedrivencontext".  Using the MessageDrivenContext for transaction related operations has been the recommendation for most of the last two decades (starting when EJB 2.0 was released).

               

              In this case, should I change the connection factory to the non-XA one - java:/ConnectionFactory ? Or should I drop the resource injection and lookup the factory manually through JNDI?

              I recommend you create a new <pooled-connection-factory> just like JmsXA but use <transaction mode="none"/> on it. That way you can still have most of the benefits which the pooled-connection-factory provides.

              • 4. Re: Messages not delivered to reply queue
                vrushank.desai

                Thanks, Justin. I have incorporated the changes you have suggested.