Send message to specific node in clustered environment
hushen.savani Feb 7, 2012 2:30 AMHi,
I have am running my middleware application on JBossAS-5.1.0 with Hornetq-2.2.5. I have clustered hornetq server. I have tested my application with clustering (2 nodes). It works fine. It load balances the messages between two nodes in RR Fashion properly. Now I have a requirement to send certain messages to certain node on an event. For that I have written following code:
private void enqueueRemoteQueue(MDSMessage mdsMessage, ArrayList nodeAddress) { HornetQ hq = null; mdsMessage.setMessageType("INIT_POLLING_INTERFACE"); try { for(int i=0;i<nodeAddress.size();i++) { hq = new HornetQ(); hq.startConnection(getInitialContext((String)nodeAddress.get(i)), "mocmManagementQueue"); hq.enqueueEntity(mdsMessage); hq.stopConnection(); hq = null; } } catch (Exception e) { e.printStackTrace(); } } private Context getInitialContext(String strNodeAdderss) throws javax.naming.NamingException { Properties p = new Properties( ); p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory"); p.put(Context.URL_PKG_PREFIXES,"org.jboss.naming:org.jnp.interfaces"); p.put(Context.PROVIDER_URL,"jnp://"+strNodeAdderss); return new javax.naming.InitialContext(p); }
I am passing List of node addresses(Host:Port) in enqueueRemoteQueue() method for sending given message to specified node address. My HornetQ class is as following:
public class Hornet { private Context _ctx = null; private Session _session = null; private Queue _queue = null; private Connection _conn = null; private MessageProducer _producer = null; private String strLogPreffix = "HornetQ"; private HashMap hmConfig = null; private ConnectionFactory _cfConnFactory; public void startConnection(Context initialContecxt, String hornetqQueue) throws BusinessTaskException { String strMethodName ="startConnection():: "; System.out.println(strLogPreffix + strMethodName + "HornetQ Queue Name:: " + hornetqQueue); try { this._ctx = initialContecxt; String queue = ""; if(hornetqQueue!=null) { //Set HornetQ Queue queue = "/queue/" + hornetqQueue; this._queue = (Queue)_ctx.lookup(queue); //Set Connection _cfConnFactory = (ConnectionFactory) _ctx.lookup("/ThroughputConnectionFactory"); this._conn = _cfConnFactory.createConnection(); //Set session this._session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); _producer = _session.createProducer(_queue); _producer.setDisableMessageID(true); _producer.setDisableMessageTimestamp(true); //Start Service if(_conn!=null) _conn.start(); else System.err.println("[ HornetQ:: startConnection() ]Error in Connection!"); } else { System.err.println("[HornetQ :: startConnection() Queue Name is not provided.]"); } } catch (Exception e) { e.printStackTrace(); } } public void stopConnection() throws BusinessTaskException { String strMethodName ="stopConnection():: "; try { _ctx.close(); _conn.close(); _session.close(); _producer.close(); }catch (NamingException e) { System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e); e.printStackTrace(); }catch (JMSException e) { System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e); e.printStackTrace(); } } public void enqueueEntity(Object objEntities) throws BusinessTaskException { String strMethodName ="enqueueEntity():: "; int iMessagePriority = 0; String strCurrentJob = null; if(hmConfig==null) { hmConfig = new HashMap(); ArrayList lstConfig = (ArrayList)XmlParserUtil.getInstance().getConfig("message-priority"); hmConfig = (HashMap)lstConfig.get(0); } try { //System.out.println(strLogPreffix + strMethodName + "objEntities:: "+ objEntities); if(objEntities!=null) { MDSMessage mdsMessage = (MDSMessage)objEntities; strCurrentJob = mdsMessage.getCurrentJob(); if(strCurrentJob!=null) iMessagePriority = Integer.parseInt(((String)hmConfig.get(strCurrentJob.trim())).trim()); ObjectMessage objMessage = null; objMessage = _session.createObjectMessage((Serializable) objEntities); _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0); } else { System.out.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!"); } }catch (JMSException e) { // TODO: handle exception System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e); e.printStackTrace(); } } }
But what happens with this piece of code is that these specific messages are not sent to specific nodes, but they round robins.
Pl. suggest how can I send specific message to specific node address in clustered environment instead of round-robined.
Thank you.
Best Regards,
Hushen Savani