How to make the JMS receiver client wait till the next message arrives in the queue?
chithu21 Jul 12, 2012 6:05 AMHi,
I have a receiver client program and a sender program.
Receiver Program:
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.SQLException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import com.dcat2.common.Config;
import com.dcat2.common.Constants;
import com.dcat2.common.ReadConfig;
import com.dcat2.logger.LoggerFactory;
import com.dcat2.logger.LoggerImp;
import com.dcat2.datatransfer.DbConnection;
import EDU.oswego.cs.dl.util.concurrent.CountDown;
/**.
* This class contains methods to read messages from the Queue
* and inserts them to receiving DB
*
* This class needs to be updated after getting spec from XIB,
* and is just retained as place holder, it will not work on Jboss 6
*
* @author Chithra V S
*
*/
public class MessageReceiver implements MessageListener {
/**.
* Initialize the logger
*/
private static com.dcat2.logger.LogInterface log =
(LoggerImp) LoggerFactory.getLogger(MessageReceiver.class);
/**.
* Initialize the counter
*/
static final int N = 1;
/**.
* Initialize the CountDown which is used to
* notify a driver when all threads are complete.
*/
static CountDown done = new CountDown(N);
/**.
* Initialize the DCAT Centre Variable
*/
private static String strDcatCentre = "";
/**.
* Initialize the Rejection Table Name Variable.
*/
private String strRejectionTable = "";
/**.
* Initialize the Counter for DB.
*/
private static int count = 0 ;
/**.
* Initialize the ReadConfig
*/
private static ReadConfig readConfig = new ReadConfig();
/**.
* Read the properties file
*/
private static Config config = readConfig.readPropertiesFile();
static JMSConnector jmsConnector = new JMSConnector();
private boolean quit = false;
/**.
* Constructor
*/
public MessageReceiver() {
if (log == null) {
System.err.println(
"Failure Creating Logtrace Object!!!");
}
}
/**.
* Inner class which implements MessageListener
* and receives messages from the Queue
*
*/
public static class ExListener implements MessageListener {
DbConnection dbConnection = new DbConnection();
/**.
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
* @param msg - The JMS Message
*/
public void onMessage(Message msg) {
log.debug("Inside OnMessage");
//done.release();
try {
String strReceivedMsg = "";
if (msg instanceof TextMessage) {
strReceivedMsg = ((TextMessage) msg).getText();
} else {
strReceivedMsg = msg.toString();
}
log.debug("onMessage, recv text=" + strReceivedMsg);
if (strReceivedMsg.equalsIgnoreCase("quit")) {
synchronized (this) {
quit = true;
this.notifyAll(); // Notify main thread to quit
}
}
dbConnection.insertToReceivingDb(strReceivedMsg,
getStrDcatCentre(), config.getRecvMsgTable());
count++;
} catch (JMSException e) {
log.error(
"An exception occured while receiving from Queue : "
+ e);
}
finally {
try {
//MessageReceiver.done.acquire();
jmsConnector.closePTP();
}/* catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/ catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
/**.
* This methods throws an exception when an exception occurs in
* the onMessage method
* @param exception - Exception
*/
public void onException(final JMSException exception) {
log.error("An error occurred: " + exception);
}
/**.
* This method is used to receive messages from the in bound queue
* and insert them to the receiving database
* @param None
* @throws JMSException - JMS Exception
* @throws NamingException - Naming Exception
* @throws InterruptedException - Interrupted Exception
* @throws ClassNotFoundException - Class Not Found Exception
* @throws SQLException - SQL Exception
*/
public final void receiveMessage() throws JMSException, NamingException,
InterruptedException, ClassNotFoundException, SQLException {
log.debug("Begin receiveMessage method");
QueueObject queueObj = jmsConnector.setupPTP(config.getProviderUrl(),
config.getXibToCatQueue());
QueueSession session = queueObj.getQueueSession();
Queue xibToCatQueue = queueObj.getQueue();
log.debug("Queue Name : " + xibToCatQueue.getQueueName());
// Set the async listener for xibToCatQueue
QueueReceiver recv = session.createReceiver(xibToCatQueue);
log.info("************Receiver Is Ready To Receive Messages************");
recv.setMessageListener(new ExListener());
synchronized (recv) {
while (!quit) {
try {
recv.wait();
} catch (InterruptedException ie) {
}
}
}
recv.close();
//log.info(count + " row/rows inserted to " + config.getRecvMsgTable() + " table");
//log.info("************Receiving Of Messages Completed************");
//log.debug("End receiveMessage method");
}
/**.
* This method returns the value of Dcat Centre
* @param None
* @return strDcatCentre
* @exception None
*/
public static String getStrDcatCentre() {
return strDcatCentre;
}
/**.
* This method sets the value of Dcat Centre
* @param strDcatCentre = DCAT Centre Code
* @exception None
*/
public final void setStrDcatCentre(final String strDcatCentre) {
MessageReceiver.strDcatCentre = strDcatCentre;
}
public static void main(final String[] args) throws JMSException,
NamingException, InterruptedException, ClassNotFoundException,
SQLException {
MessageReceiver msgReceiver = new MessageReceiver();
msgReceiver.receiveMessage();
}
}
Sender:
package com.dcat2.messaging.sendreceive;
import java.util.ArrayList;
import java.util.Iterator;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import com.dcat2.common.Config;
import com.dcat2.common.Constants;
import com.dcat2.common.ReadConfig;
import com.dcat2.logger.LoggerFactory;
import com.dcat2.logger.LoggerImp;
import com.dcat2.datatransfer.DbConnection;
/**
* This class has methods to send XML messages to the XIB Queue. Note:Once the
* XIB details are available, this class may need modification
*
* @author Chithra V S
*
*/
public class MessageSender {
/**.
* Initialize the logger
*/
private static com.dcat2.logger.LogInterface log =
(LoggerImp) LoggerFactory.getLogger(MessageSender.class);
/**.
* Constructor
*/
public MessageSender() {
if (log == null) {
System.err.println("Failure Creating Logtrace Object!!!");
}
}
/**.
* This method sends xml message to the out bound queue
* @param strKey - Key received at run time
* @param strDcatCentre - DCAT Center Code received at run time
* @throws JMSException - JMS Exception
* @throws NamingException - Naming Exception
*/
public final void sendMessage(final String strKey, final String strDcatCentre)
throws JMSException, NamingException {
log.debug("Begin sendMessage Method");
ReadConfig readValues = new ReadConfig();
Config config = readValues.readPropertiesFile();
JMSConnector jmsConnector = new JMSConnector();
String strStatus = "";
String strXMLMessage = "";
/*
* Read segments from SendMsg Db Table
* and sends to the Queue
*/
DbConnection dbConnection = new DbConnection();
/*QueueObject queueObj = jmsConnector.setupPTP(
config.getProviderUrlXib(),
config.getCatToXibQueue());*/
QueueObject queueObj = jmsConnector.setupPTP(
config.getProviderUrlXib(),
config.getXibToCatQueue());
QueueSession session = queueObj.getQueueSession();
Queue catToXibQueue = queueObj.getQueue();
log.debug("Queue Name: " + catToXibQueue.getQueueName());
QueueSender sender = session.createSender(catToXibQueue);
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
try {
ArrayList<String> msgList = dbConnection.readFromSendMsgDb(
config.getSendMsgDb(), strKey, strDcatCentre);
Iterator<String> iterator = msgList.iterator();
log.info("************Sending Of Messages Started************");
while (iterator.hasNext()) {
strXMLMessage = iterator.next();
log.debug("XML Message to be sent : " + strXMLMessage);
TextMessage tm = session.createTextMessage(strXMLMessage);
sender.send(tm);
log.info("Sent Message =" + tm.getText());
strStatus = Constants.RETURN_CODE_OK;
}
} catch (JMSException e) {
// write to log file if the message was not sent
log.error("The message: " + strXMLMessage + " was not sent : " + e);
strStatus = Constants.RETURN_CODE_NOK;
} finally {
if (sender != null) {
sender.close();
}
jmsConnector.closePTP();
}
log.info("************Sending Of Messages Completed************");
log.debug("End sendMessage Method");
}
/**.
* Main method
* @param args - String array of command line arguments
* @throws JMSException - JMS Exception
* @throws NamingException - Naming Exception
*/
public static void main(final String[] args) throws JMSException,
NamingException {
String strKey = args[0];
String strDcatCentre = args[1];
MessageSender messageSender = new MessageSender();
messageSender.sendMessage(strKey, strDcatCentre);
}
}
JMSConnector:
package com.dcat2.messaging.sendreceive;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import com.dcat2.common.Config;
import com.dcat2.common.ReadConfig;
/**.
* This class contains methods to setup a point to point connection
* to a JMS Queue
*
* @author Chithra V S
*
*/
public class JMSConnector {
/**.
* Queue Connection Object
*/
private QueueConnection connection;
/**.
* Queue Session Object
*/
private QueueSession session;
/**.
* This method is used to setup a Point To Point Connection
* @param contextUrl - URL of the Queue Provider
* @param queueName - Name of the Queue
* @return QueueObject
* @throws JMSException - JMS Exception
* @throws NamingException - Naming Exception
*/
public final QueueObject setupPTP(
final String contextUrl, final String queueName)
throws JMSException, NamingException {
ReadConfig readValues = new ReadConfig();
Config config = readValues.readPropertiesFile();
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY, config.getContextFactory());
env.put(Context.SECURITY_PRINCIPAL, config.getJmsUserName());
env.put(Context.SECURITY_CREDENTIALS, config.getJmsUserPwd());
env.put(Context.PROVIDER_URL, contextUrl);
InitialContext iniCtx = new InitialContext(env);
QueueConnectionFactory qcf = (QueueConnectionFactory)iniCtx.lookup(config.getJmsFactory());
connection = qcf.createQueueConnection();
String lookUpObj = "java:/" + queueName;
Queue xibToCatQueue = (Queue) iniCtx.lookup(lookUpObj);
session = connection.createQueueSession(
false, QueueSession.AUTO_ACKNOWLEDGE);
connection.start();
return new QueueObject(session, xibToCatQueue);
}
/**.
* This method is used to close the Point To Point Connection
* @param None
* @throws JMSException - JMS Exception
*/
public final void closePTP() throws JMSException {
if (session != null) {
session.close();
}
if (connection != null) {
connection.stop();
}
}
}
I ran the MessageReceiver & then ran the MessageSender. The receiver program didnt terminate. But it also didnt trigger the onMessage method to receive the messages.
Any idea what is wrong?