6 Replies Latest reply on Feb 9, 2012 5:08 AM by hushen.savani

    Send message to specific node in clustered environment

    hushen.savani

      Hi,

         

           I have am running my middleware application on JBossAS-5.1.0 with Hornetq-2.2.5. I have clustered hornetq server.  I have tested my application with clustering (2 nodes). It works fine. It load balances the messages between two nodes in RR Fashion properly. Now I have a requirement to send certain messages to certain node on an event. For that I have written following code:

       

      private void enqueueRemoteQueue(MDSMessage mdsMessage, ArrayList nodeAddress) {
                          HornetQ hq = null;
        
                          mdsMessage.setMessageType("INIT_POLLING_INTERFACE");
        
                          try { 
                                    for(int i=0;i<nodeAddress.size();i++) {
                                              hq = new HornetQ();
                                              hq.startConnection(getInitialContext((String)nodeAddress.get(i)), "mocmManagementQueue");
                                              hq.enqueueEntity(mdsMessage);
                                              hq.stopConnection();
                                              hq = null;
                                    }
                          }
                          catch (Exception e) {
                                    e.printStackTrace();
                          } 
                }
        
                private Context getInitialContext(String strNodeAdderss) throws javax.naming.NamingException {                                 
                          Properties p = new Properties( );
                          p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
                          p.put(Context.URL_PKG_PREFIXES,"org.jboss.naming:org.jnp.interfaces");
                          p.put(Context.PROVIDER_URL,"jnp://"+strNodeAdderss);
                          return new javax.naming.InitialContext(p);
                }
      

       

      I am passing List of node addresses(Host:Port) in enqueueRemoteQueue() method for sending given message to specified node address. My HornetQ class is as following:

       

      public class Hornet {
        
                private Context _ctx = null;
                private Session _session = null;
                private Queue _queue = null;
                private Connection _conn = null;
                private MessageProducer _producer = null;
                private String strLogPreffix = "HornetQ";
                private HashMap hmConfig = null;
                private ConnectionFactory _cfConnFactory;
        
                public void startConnection(Context initialContecxt, String hornetqQueue) throws BusinessTaskException {
        
                          String strMethodName ="startConnection():: ";
        
                          System.out.println(strLogPreffix + strMethodName + "HornetQ Queue Name:: " + hornetqQueue);
        
                          try {
                                    this._ctx = initialContecxt;
        
                                    String queue = "";
                                    if(hornetqQueue!=null) {
                                              //Set HornetQ Queue
                                              queue = "/queue/" + hornetqQueue;
                                              this._queue = (Queue)_ctx.lookup(queue);
        
                                              //Set Connection
                                              _cfConnFactory = (ConnectionFactory) _ctx.lookup("/ThroughputConnectionFactory");
                                              this._conn = _cfConnFactory.createConnection();
        
                                              //Set session
                                              this._session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
                                              _producer = _session.createProducer(_queue); 
                                  _producer.setDisableMessageID(true);
                                  _producer.setDisableMessageTimestamp(true);
                                  
                                  //Start Service
                                  if(_conn!=null)
                                                        _conn.start();
                                              else
                                                        System.err.println("[ HornetQ:: startConnection() ]Error in Connection!");                            
                                       
                                    }
                                    else {
                                              System.err.println("[HornetQ :: startConnection() Queue Name is not provided.]");
                                    } 
                          }
                          catch (Exception e) { 
                                    e.printStackTrace();
                          }
                }
      
      
                public void stopConnection() throws BusinessTaskException {
                          String strMethodName ="stopConnection():: ";
        
                          try {
                                    _ctx.close();
                        _conn.close();            
                        _session.close();
                        _producer.close();
              }catch (NamingException e) {
                        System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          }catch (JMSException e) {
                        System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          }
              
                }
        
                public void enqueueEntity(Object objEntities) throws BusinessTaskException {
             
                          String strMethodName ="enqueueEntity():: ";
        
                          int iMessagePriority = 0;
                          String strCurrentJob = null;
                          if(hmConfig==null) {
                                    hmConfig = new HashMap();
                                    ArrayList lstConfig = (ArrayList)XmlParserUtil.getInstance().getConfig("message-priority");
                                    hmConfig = (HashMap)lstConfig.get(0); 
                          }
                          try {
                                    //System.out.println(strLogPreffix + strMethodName + "objEntities:: "+ objEntities);
                                    if(objEntities!=null) {
                                              MDSMessage mdsMessage = (MDSMessage)objEntities;
                                              strCurrentJob = mdsMessage.getCurrentJob();
                                              if(strCurrentJob!=null)
                                                        iMessagePriority = Integer.parseInt(((String)hmConfig.get(strCurrentJob.trim())).trim());
                                              ObjectMessage objMessage = null;
                                  objMessage = _session.createObjectMessage((Serializable) objEntities);
                                  _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);                           
                                                              
                                    }
                                    else {
                                              System.out.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
                                    }
        
                          }catch (JMSException e) {
                                    // TODO: handle exception
                                    System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          }
                } 
      }
      

       

      But what happens with this piece of code is that these specific messages are not sent to specific nodes, but they round robins.

       

      Pl. suggest how can I send specific message to specific node address in clustered environment instead of round-robined.

       

      Thank you.

       

      Best Regards,

      Hushen Savani

        • 1. Re: Send message to specific node in clustered environment
          ataylor

          you cant switch from clustering to non clustering mode, you probably need to rethink how your application is working. you could use a message selector for each node and have your consumer use that.

          • 2. Re: Send message to specific node in clustered environment
            hushen.savani

            Hi Andy,

             

                 Thanks. But message selector will not help in my case because, I deploy same application ear on both the nodes. So, there will be the same sort of consumers on both the nodes. However, I have read a workaround posted on this thread. I have tried using it as following:

             

            Queues defined in hornetq-jms.xml:

                     <!-- Clustered Queues -->

                    <queue name="clustered.mocmMasterQueue">

                            <entry name="/queue/clustered.mocmMasterQueue"/>

                    </queue>

             

                    <queue name="clustered.mocmProcessingQueue">

                            <entry name="/queue/clustered.mocmProcessingQueue"/>

                    </queue>

             

                      <queue name="clustered.mocmSuccessQueue">

                            <entry name="/queue/clustered.mocmSuccessQueue"/>

                    </queue>

             

                    <queue name="clustered.mocmFailureQueue">

                            <entry name="/queue/clustered.mocmFailureQueue"/>

                    </queue>

             

                     <!-- Non-Clustered Queue -->
                    <queue name="nonclustered.mocmManagementQueue">

                            <entry name="/queue/nonclustered.mocmManagementQueue"/>

                    </queue>

             

            Address Configured in Clusted Connection in hornetq-configuration.xml:

            <cluster-connections>

                  <cluster-connection name="mocm-cluster">

                   <address>jms.queue.clustered.#</address>

                     <connector-ref>netty-connector</connector-ref>

                     <retry-interval>500</retry-interval>

                     <use-duplicate-detection>true</use-duplicate-detection>

                     <forward-when-no-consumers>true</forward-when-no-consumers>

                     <max-hops>1</max-hops>

                     <discovery-group-ref discovery-group-name="mocm-discovery-group"/>

                  </cluster-connection>

               </cluster-connections>

             

                 Now, with this configurations, all the queues has gone non-clustered in clustered environment. Messages are not being round-robined Can you please suggest what could've gone wrong here?

             

                 Thanks.

             

            Best Regards,

            Hushen Savani

            1 of 1 people found this helpful
            • 3. Re: Send message to specific node in clustered environment
              ataylor

              Now, with this configurations, all the queues has gone non-clustered in clustered environment. Messages are not being round-robined Can you please suggest what could've gone wrong here?

              Im not sure i understand, are you saying that the address on cluster-connection isnt adhered too?

              • 4. Re: Send message to specific node in clustered environment
                hushen.savani

                Hi Andy,

                 

                     Im not sure i understand, are you saying that the address on cluster-connection isnt adhered too?

                 

                          Yes, I meant the same. And good news is that it worked now with following confogurations in hornetq-configurations.xml:

                <cluster-connections>

                      <cluster-connection name="mocm-cluster">

                       <address>jms.queue.clustered</address>

                         <connector-ref>netty-connector</connector-ref>

                         <retry-interval>500</retry-interval>

                         <use-duplicate-detection>true</use-duplicate-detection>

                         <forward-when-no-consumers>true</forward-when-no-consumers>

                         <max-hops>1</max-hops>

                         <discovery-group-ref discovery-group-name="mocm-discovery-group"/>

                      </cluster-connection>

                   </cluster-connections>

                 

                          I removed '#' wildcard literal from 'jms.queue.clustered.#' in <address> tag. And it worked like a charm. May be hornetq is not able to interpret the wild card syntax in this tag of <cluster-connection>. This brought me idea from this section in Hornetq-2.2.5 documentation. The doc says that "cluster connection will load balance messages sent to address that start with jms. This cluster connection, will, in effect apply to all JMS queue and topic subscriptions since they map to core queues that start with the substring "jms". Hence, I changed substring to "jms.queue.clustered" instead of default value "jms" in <address> tag of <cluster-connection>.

                 

                          Pl. suggest if this is the right approach. Will there be any side effects of this configuration?

                 

                Best Regards,

                Hushen Savani

                • 5. Re: Send message to specific node in clustered environment
                  ataylor

                  that looks fine to me

                  • 6. Re: Send message to specific node in clustered environment
                    hushen.savani

                    Thanks for confirming man!