Send message to specific node in clustered environment
hushen.savani Feb 7, 2012 2:30 AMHi,
I have am running my middleware application on JBossAS-5.1.0 with Hornetq-2.2.5. I have clustered hornetq server. I have tested my application with clustering (2 nodes). It works fine. It load balances the messages between two nodes in RR Fashion properly. Now I have a requirement to send certain messages to certain node on an event. For that I have written following code:
private void enqueueRemoteQueue(MDSMessage mdsMessage, ArrayList nodeAddress) {
                    HornetQ hq = null;
  
                    mdsMessage.setMessageType("INIT_POLLING_INTERFACE");
  
                    try { 
                              for(int i=0;i<nodeAddress.size();i++) {
                                        hq = new HornetQ();
                                        hq.startConnection(getInitialContext((String)nodeAddress.get(i)), "mocmManagementQueue");
                                        hq.enqueueEntity(mdsMessage);
                                        hq.stopConnection();
                                        hq = null;
                              }
                    }
                    catch (Exception e) {
                              e.printStackTrace();
                    } 
          }
  
          private Context getInitialContext(String strNodeAdderss) throws javax.naming.NamingException {                                 
                    Properties p = new Properties( );
                    p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
                    p.put(Context.URL_PKG_PREFIXES,"org.jboss.naming:org.jnp.interfaces");
                    p.put(Context.PROVIDER_URL,"jnp://"+strNodeAdderss);
                    return new javax.naming.InitialContext(p);
          }
I am passing List of node addresses(Host:Port) in enqueueRemoteQueue() method for sending given message to specified node address. My HornetQ class is as following:
public class Hornet {
  
          private Context _ctx = null;
          private Session _session = null;
          private Queue _queue = null;
          private Connection _conn = null;
          private MessageProducer _producer = null;
          private String strLogPreffix = "HornetQ";
          private HashMap hmConfig = null;
          private ConnectionFactory _cfConnFactory;
  
          public void startConnection(Context initialContecxt, String hornetqQueue) throws BusinessTaskException {
  
                    String strMethodName ="startConnection():: ";
  
                    System.out.println(strLogPreffix + strMethodName + "HornetQ Queue Name:: " + hornetqQueue);
  
                    try {
                              this._ctx = initialContecxt;
  
                              String queue = "";
                              if(hornetqQueue!=null) {
                                        //Set HornetQ Queue
                                        queue = "/queue/" + hornetqQueue;
                                        this._queue = (Queue)_ctx.lookup(queue);
  
                                        //Set Connection
                                        _cfConnFactory = (ConnectionFactory) _ctx.lookup("/ThroughputConnectionFactory");
                                        this._conn = _cfConnFactory.createConnection();
  
                                        //Set session
                                        this._session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
                                        _producer = _session.createProducer(_queue); 
                            _producer.setDisableMessageID(true);
                            _producer.setDisableMessageTimestamp(true);
                            
                            //Start Service
                            if(_conn!=null)
                                                  _conn.start();
                                        else
                                                  System.err.println("[ HornetQ:: startConnection() ]Error in Connection!");                            
                                 
                              }
                              else {
                                        System.err.println("[HornetQ :: startConnection() Queue Name is not provided.]");
                              } 
                    }
                    catch (Exception e) { 
                              e.printStackTrace();
                    }
          }
          public void stopConnection() throws BusinessTaskException {
                    String strMethodName ="stopConnection():: ";
  
                    try {
                              _ctx.close();
                  _conn.close();            
                  _session.close();
                  _producer.close();
        }catch (NamingException e) {
                  System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                              e.printStackTrace();
                    }catch (JMSException e) {
                  System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                              e.printStackTrace();
                    }
        
          }
  
          public void enqueueEntity(Object objEntities) throws BusinessTaskException {
       
                    String strMethodName ="enqueueEntity():: ";
  
                    int iMessagePriority = 0;
                    String strCurrentJob = null;
                    if(hmConfig==null) {
                              hmConfig = new HashMap();
                              ArrayList lstConfig = (ArrayList)XmlParserUtil.getInstance().getConfig("message-priority");
                              hmConfig = (HashMap)lstConfig.get(0); 
                    }
                    try {
                              //System.out.println(strLogPreffix + strMethodName + "objEntities:: "+ objEntities);
                              if(objEntities!=null) {
                                        MDSMessage mdsMessage = (MDSMessage)objEntities;
                                        strCurrentJob = mdsMessage.getCurrentJob();
                                        if(strCurrentJob!=null)
                                                  iMessagePriority = Integer.parseInt(((String)hmConfig.get(strCurrentJob.trim())).trim());
                                        ObjectMessage objMessage = null;
                            objMessage = _session.createObjectMessage((Serializable) objEntities);
                            _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);                           
                                                        
                              }
                              else {
                                        System.out.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
                              }
  
                    }catch (JMSException e) {
                              // TODO: handle exception
                              System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                              e.printStackTrace();
                    }
          } 
}
But what happens with this piece of code is that these specific messages are not sent to specific nodes, but they round robins.
Pl. suggest how can I send specific message to specific node address in clustered environment instead of round-robined.
Thank you.
Best Regards,
Hushen Savani
 
    