0 Replies Latest reply on Jul 12, 2012 6:05 AM by chithu21

    How to make the JMS receiver client wait till the next message arrives in the queue?

    chithu21

      Hi,

       

      I have a receiver client program and a sender program.

       

      Receiver Program:

       

       

      import java.io.BufferedReader;

      import java.io.DataInput;

      import java.io.DataInputStream;

      import java.io.File;

      import java.io.FileInputStream;

      import java.io.IOException;

      import java.io.InputStreamReader;

      import java.sql.SQLException;

       

      import javax.jms.JMSException;

      import javax.jms.Message;

      import javax.jms.MessageListener;

      import javax.jms.Queue;

      import javax.jms.QueueReceiver;

      import javax.jms.QueueSession;

      import javax.jms.TextMessage;

      import javax.naming.NamingException;

       

      import com.dcat2.common.Config;

      import com.dcat2.common.Constants;

      import com.dcat2.common.ReadConfig;

      import com.dcat2.logger.LoggerFactory;

      import com.dcat2.logger.LoggerImp;

      import com.dcat2.datatransfer.DbConnection;

       

       

      import EDU.oswego.cs.dl.util.concurrent.CountDown;

       

      /**.

      * This class contains methods to read messages from the Queue

      * and inserts them to receiving DB

      *

      * This class needs to be updated after getting spec from XIB,

      * and is just retained as place holder, it will not work on Jboss 6

      *

      * @author Chithra V S

      *

      */

      public class MessageReceiver implements MessageListener {

       

          /**.

           * Initialize the logger

           */

          private static com.dcat2.logger.LogInterface log =

                  (LoggerImp) LoggerFactory.getLogger(MessageReceiver.class);

       

          /**.

           * Initialize the counter

           */

          static final int N = 1;

       

          /**.

           * Initialize the CountDown which is used to

           * notify a driver when all threads are complete.

           */

          static CountDown done = new CountDown(N);

       

          /**.

           * Initialize the DCAT Centre Variable

           */

          private static String strDcatCentre = "";

       

       

          /**.

           * Initialize the Rejection Table Name Variable.

           */

          private String strRejectionTable = "";

          /**.

           * Initialize the Counter for DB.

           */

          private static int count = 0 ;

       

          /**.

           * Initialize the ReadConfig

           */

          private static ReadConfig readConfig = new ReadConfig();

       

          /**.

           * Read the properties file

           */

          private static Config config = readConfig.readPropertiesFile();

          static JMSConnector jmsConnector = new JMSConnector();

          private boolean quit = false;

       

          /**.

           * Constructor

           */

          public MessageReceiver() {

              if (log == null) {

                  System.err.println(

                          "Failure Creating Logtrace Object!!!");

              }

          }

       

          /**.

           * Inner class which implements MessageListener

           * and receives messages from the Queue

           *

           */

         public static class ExListener implements MessageListener {

       

              DbConnection dbConnection = new DbConnection();

              /**.

               *  @see javax.jms.MessageListener#onMessage(javax.jms.Message)

               *  @param msg - The JMS Message

               */

              public void onMessage(Message msg) {

                  log.debug("Inside OnMessage");

                  //done.release();

                  try {

                      String strReceivedMsg = "";

                      if (msg instanceof TextMessage) {

                          strReceivedMsg = ((TextMessage) msg).getText();

                      } else {

                          strReceivedMsg = msg.toString();

                      }

                      log.debug("onMessage, recv text=" + strReceivedMsg);

                     

                      if (strReceivedMsg.equalsIgnoreCase("quit")) {

                          synchronized (this) {

                              quit = true;

                              this.notifyAll(); // Notify main thread to quit

                          }

                      }

                      dbConnection.insertToReceivingDb(strReceivedMsg,

                              getStrDcatCentre(), config.getRecvMsgTable());

                      count++;

                  } catch (JMSException e) {

                      log.error(

                              "An exception occured while receiving from Queue : "

                                                              + e);

                  }

                  finally {

                      try {

                          //MessageReceiver.done.acquire();

                          jmsConnector.closePTP();

                      }/* catch (InterruptedException e) {

                          // TODO Auto-generated catch block

                          e.printStackTrace();

                      }*/ catch (JMSException e) {

                          // TODO Auto-generated catch block

                          e.printStackTrace();

                      }

                     

                  }

              }

      }

          /**.

           * This methods throws an exception when an exception occurs in

           * the onMessage method

           * @param exception - Exception

           */

          public void onException(final JMSException exception) {

              log.error("An error occurred: " + exception);

          }

       

       

          /**.

           * This method is used to receive messages from the in bound queue

           * and insert them to the receiving database

           * @param None

           * @throws JMSException - JMS Exception

           * @throws NamingException - Naming Exception

           * @throws InterruptedException - Interrupted Exception

           * @throws ClassNotFoundException - Class Not Found Exception

           * @throws SQLException - SQL Exception

           */

          public final void receiveMessage() throws JMSException, NamingException,

                  InterruptedException, ClassNotFoundException, SQLException {

       

              log.debug("Begin receiveMessage method");

       

             

       

              QueueObject queueObj = jmsConnector.setupPTP(config.getProviderUrl(),

                      config.getXibToCatQueue());

              QueueSession session = queueObj.getQueueSession();

              Queue xibToCatQueue = queueObj.getQueue();

       

              log.debug("Queue Name : " + xibToCatQueue.getQueueName());

       

              // Set the async listener for xibToCatQueue

              QueueReceiver recv = session.createReceiver(xibToCatQueue);

              log.info("************Receiver Is Ready To Receive Messages************");

            recv.setMessageListener(new ExListener());

          

              synchronized (recv) {

                  while (!quit) {

                      try {

                          recv.wait();

                      } catch (InterruptedException ie) {

                      }

                  }

              }

          

              recv.close();

             

              //log.info(count + " row/rows inserted to " + config.getRecvMsgTable() + " table");

              //log.info("************Receiving Of Messages Completed************");

              //log.debug("End receiveMessage method");

          }

       

          /**.

           * This method returns the value of Dcat Centre

           * @param None

           * @return strDcatCentre

           * @exception None

           */

          public static String getStrDcatCentre() {

              return strDcatCentre;

          }

       

       

          /**.

           * This method sets the value of Dcat Centre

           * @param strDcatCentre = DCAT Centre Code

           * @exception None

           */

          public final void setStrDcatCentre(final String strDcatCentre) {

              MessageReceiver.strDcatCentre = strDcatCentre;

          }

           

         public static void main(final String[] args) throws JMSException,

                  NamingException, InterruptedException, ClassNotFoundException,

                  SQLException {

       

          

              MessageReceiver msgReceiver = new MessageReceiver();

           

              msgReceiver.receiveMessage();

          }

      }

       

      Sender:

       

      package com.dcat2.messaging.sendreceive;

       

      import java.util.ArrayList;

      import java.util.Iterator;

       

      import javax.jms.DeliveryMode;

      import javax.jms.JMSException;

      import javax.jms.Queue;

      import javax.jms.QueueSender;

      import javax.jms.QueueSession;

      import javax.jms.TextMessage;

      import javax.naming.NamingException;

       

      import com.dcat2.common.Config;

      import com.dcat2.common.Constants;

      import com.dcat2.common.ReadConfig;

      import com.dcat2.logger.LoggerFactory;

      import com.dcat2.logger.LoggerImp;

      import com.dcat2.datatransfer.DbConnection;

       

      /**

      * This class has methods to send XML messages to the XIB Queue. Note:Once the

      * XIB details are available, this class may need modification

      *

      * @author Chithra V S

      *

      */

      public class MessageSender {

       

          /**.

           * Initialize the logger

           */

          private static com.dcat2.logger.LogInterface log =

                  (LoggerImp) LoggerFactory.getLogger(MessageSender.class);

       

          /**.

           * Constructor

           */

          public MessageSender() {

              if (log == null) {

                  System.err.println("Failure Creating Logtrace Object!!!");

              }

          }

       

          /**.

           * This method sends xml message to the out bound queue

           * @param strKey - Key received at run time

           * @param strDcatCentre - DCAT Center Code received at run time

           * @throws JMSException - JMS Exception

           * @throws NamingException - Naming Exception

           */

          public final void sendMessage(final String strKey, final String strDcatCentre)

                  throws JMSException, NamingException {

              log.debug("Begin sendMessage Method");

       

              ReadConfig readValues = new ReadConfig();

              Config config = readValues.readPropertiesFile();

              JMSConnector jmsConnector = new JMSConnector();

              String strStatus = "";

              String strXMLMessage = "";

       

              /*

               * Read segments from SendMsg Db Table

               * and sends to the Queue

               */

              DbConnection dbConnection = new DbConnection();

       

              /*QueueObject queueObj = jmsConnector.setupPTP(

                      config.getProviderUrlXib(),

                      config.getCatToXibQueue());*/

              QueueObject queueObj = jmsConnector.setupPTP(

                      config.getProviderUrlXib(),

                      config.getXibToCatQueue());

              QueueSession session = queueObj.getQueueSession();

              Queue catToXibQueue = queueObj.getQueue();

              log.debug("Queue Name: " + catToXibQueue.getQueueName());

              QueueSender sender = session.createSender(catToXibQueue);

              sender.setDeliveryMode(DeliveryMode.PERSISTENT);

       

              try {

                  ArrayList<String> msgList = dbConnection.readFromSendMsgDb(

                          config.getSendMsgDb(), strKey, strDcatCentre);

                  Iterator<String> iterator = msgList.iterator();

                  log.info("************Sending Of Messages Started************");

                  while (iterator.hasNext()) {

                          strXMLMessage = iterator.next();

                          log.debug("XML Message to be sent : " + strXMLMessage);

                          TextMessage tm = session.createTextMessage(strXMLMessage);

                          sender.send(tm);

                          log.info("Sent Message =" + tm.getText());

                          strStatus = Constants.RETURN_CODE_OK;

                      }

              } catch (JMSException e) {

                  // write to log file if the message was not sent

                  log.error("The message: " + strXMLMessage + " was not sent : " + e);

                  strStatus = Constants.RETURN_CODE_NOK;

              } finally {

                  if (sender != null) {

                      sender.close();

                  }

                  jmsConnector.closePTP();

              }

              log.info("************Sending Of Messages Completed************");

              log.debug("End sendMessage Method");

          }

       

           /**.

           * Main method

           * @param args - String array of command line arguments

           * @throws JMSException - JMS Exception

           * @throws NamingException - Naming Exception

           */

          public static void main(final String[] args) throws JMSException,

                  NamingException {

              String strKey = args[0];

              String strDcatCentre = args[1];

              MessageSender messageSender = new MessageSender();

              messageSender.sendMessage(strKey, strDcatCentre);

          }

      }

       

      JMSConnector:

       

      package com.dcat2.messaging.sendreceive;

       

      import java.util.Properties;

       

      import javax.jms.JMSException;

      import javax.jms.Queue;

      import javax.jms.QueueConnection;

      import javax.jms.QueueConnectionFactory;

      import javax.jms.QueueSession;

      import javax.naming.Context;

      import javax.naming.InitialContext;

      import javax.naming.NamingException;

       

      import com.dcat2.common.Config;

      import com.dcat2.common.ReadConfig;

       

      /**.

      * This class contains methods to setup a point to point connection

      * to a JMS Queue

      *

      * @author Chithra V S

      *

      */

      public class JMSConnector {

       

          /**.

           * Queue Connection Object

           */

          private QueueConnection connection;

       

          /**.

           * Queue Session Object

           */

          private QueueSession session;

       

          /**.

           * This method is used to setup a Point To Point Connection

           * @param contextUrl - URL of the Queue Provider

           * @param queueName - Name of the Queue

           * @return QueueObject

           * @throws JMSException - JMS Exception

           * @throws NamingException - Naming Exception

           */

          public final QueueObject setupPTP(

                  final String contextUrl, final String queueName)

                  throws JMSException, NamingException {

       

              ReadConfig readValues = new ReadConfig();

              Config config = readValues.readPropertiesFile();

              Properties env = new Properties();

              env.put(Context.INITIAL_CONTEXT_FACTORY, config.getContextFactory());

              env.put(Context.SECURITY_PRINCIPAL, config.getJmsUserName());

              env.put(Context.SECURITY_CREDENTIALS, config.getJmsUserPwd());

              env.put(Context.PROVIDER_URL, contextUrl);

              InitialContext iniCtx = new InitialContext(env);

       

              QueueConnectionFactory qcf = (QueueConnectionFactory)iniCtx.lookup(config.getJmsFactory());

       

              connection = qcf.createQueueConnection();

              String lookUpObj = "java:/" + queueName;

              Queue xibToCatQueue =  (Queue) iniCtx.lookup(lookUpObj);

       

              session = connection.createQueueSession(

                      false, QueueSession.AUTO_ACKNOWLEDGE);

              connection.start();

       

              return new QueueObject(session, xibToCatQueue);

          }

       

          /**.

           * This method is used to close the Point To Point Connection

           * @param None

           * @throws JMSException - JMS Exception

           */

          public final void closePTP() throws JMSException {

              if (session != null) {

                  session.close();

              }

              if (connection != null) {

                  connection.stop();

              }

          }

      }

       

       

      I ran the MessageReceiver & then ran the MessageSender. The receiver program didnt terminate. But it also didnt trigger the onMessage method to receive the messages.

       

      Any idea what is wrong?