Send message to specific node in clustered environment
hushen.savani Feb 7, 2012 2:30 AMHi,
I have am running my middleware application on JBossAS-5.1.0 with Hornetq-2.2.5. I have clustered hornetq server. I have tested my application with clustering (2 nodes). It works fine. It load balances the messages between two nodes in RR Fashion properly. Now I have a requirement to send certain messages to certain node on an event. For that I have written following code:
private void enqueueRemoteQueue(MDSMessage mdsMessage, ArrayList nodeAddress) {
HornetQ hq = null;
mdsMessage.setMessageType("INIT_POLLING_INTERFACE");
try {
for(int i=0;i<nodeAddress.size();i++) {
hq = new HornetQ();
hq.startConnection(getInitialContext((String)nodeAddress.get(i)), "mocmManagementQueue");
hq.enqueueEntity(mdsMessage);
hq.stopConnection();
hq = null;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
private Context getInitialContext(String strNodeAdderss) throws javax.naming.NamingException {
Properties p = new Properties( );
p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
p.put(Context.URL_PKG_PREFIXES,"org.jboss.naming:org.jnp.interfaces");
p.put(Context.PROVIDER_URL,"jnp://"+strNodeAdderss);
return new javax.naming.InitialContext(p);
}
I am passing List of node addresses(Host:Port) in enqueueRemoteQueue() method for sending given message to specified node address. My HornetQ class is as following:
public class Hornet {
private Context _ctx = null;
private Session _session = null;
private Queue _queue = null;
private Connection _conn = null;
private MessageProducer _producer = null;
private String strLogPreffix = "HornetQ";
private HashMap hmConfig = null;
private ConnectionFactory _cfConnFactory;
public void startConnection(Context initialContecxt, String hornetqQueue) throws BusinessTaskException {
String strMethodName ="startConnection():: ";
System.out.println(strLogPreffix + strMethodName + "HornetQ Queue Name:: " + hornetqQueue);
try {
this._ctx = initialContecxt;
String queue = "";
if(hornetqQueue!=null) {
//Set HornetQ Queue
queue = "/queue/" + hornetqQueue;
this._queue = (Queue)_ctx.lookup(queue);
//Set Connection
_cfConnFactory = (ConnectionFactory) _ctx.lookup("/ThroughputConnectionFactory");
this._conn = _cfConnFactory.createConnection();
//Set session
this._session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
_producer = _session.createProducer(_queue);
_producer.setDisableMessageID(true);
_producer.setDisableMessageTimestamp(true);
//Start Service
if(_conn!=null)
_conn.start();
else
System.err.println("[ HornetQ:: startConnection() ]Error in Connection!");
}
else {
System.err.println("[HornetQ :: startConnection() Queue Name is not provided.]");
}
}
catch (Exception e) {
e.printStackTrace();
}
}
public void stopConnection() throws BusinessTaskException {
String strMethodName ="stopConnection():: ";
try {
_ctx.close();
_conn.close();
_session.close();
_producer.close();
}catch (NamingException e) {
System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
e.printStackTrace();
}catch (JMSException e) {
System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
e.printStackTrace();
}
}
public void enqueueEntity(Object objEntities) throws BusinessTaskException {
String strMethodName ="enqueueEntity():: ";
int iMessagePriority = 0;
String strCurrentJob = null;
if(hmConfig==null) {
hmConfig = new HashMap();
ArrayList lstConfig = (ArrayList)XmlParserUtil.getInstance().getConfig("message-priority");
hmConfig = (HashMap)lstConfig.get(0);
}
try {
//System.out.println(strLogPreffix + strMethodName + "objEntities:: "+ objEntities);
if(objEntities!=null) {
MDSMessage mdsMessage = (MDSMessage)objEntities;
strCurrentJob = mdsMessage.getCurrentJob();
if(strCurrentJob!=null)
iMessagePriority = Integer.parseInt(((String)hmConfig.get(strCurrentJob.trim())).trim());
ObjectMessage objMessage = null;
objMessage = _session.createObjectMessage((Serializable) objEntities);
_producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);
}
else {
System.out.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
}
}catch (JMSException e) {
// TODO: handle exception
System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
e.printStackTrace();
}
}
}
But what happens with this piece of code is that these specific messages are not sent to specific nodes, but they round robins.
Pl. suggest how can I send specific message to specific node address in clustered environment instead of round-robined.
Thank you.
Best Regards,
Hushen Savani