2 Replies Latest reply on Jan 17, 2012 3:13 AM by hushen.savani

    Unable to get collective message count for clustered queue

    hushen.savani

      Hi,

         

           I have my application running on JBossAS-6.1.1 with Hornetq-2.2.5. I am gettinng queue counts in my application using management interface. In the application flow, when received message for initiating queue counter, the queue counter will be started. Following is the code for queue counters. Pl. note startConnection(), initMessageCounterSample() and getMessageCount() methods. I am calling these three methods to get the queue count. And then stopConnection() after getting queue count. Also, JMX_URL is fetched from .properties file as service:jmx:rmi://10.105.1.20/jndi/rmi://10.105.1.20:1490/jmxconnector:

       

      public class HornetQ {
        
                private Context _ctx = null;
                private Session _session = null;
                private Queue _queue = null;
                private Connection _conn = null;
                private MessageProducer _producer = null;
                private String strLogPreffix = "HornetQ";
                private JMSQueueControl _queueControl = null;
                private MessageCounterInfo _messageCounter = null;
                private String resourcePath = null;
                private Hashtable configHT =  null; 
                private String JMX_URL = null; 
                private ObjectName on = null;
                private JMXConnector connector = null;
                private MBeanServerConnection mbsc = null;
                private long lFinalCounter = 0; 
                  private ConnectionFactory _cfConnFactory; 
      
                public void startConnection(Context initialContecxt, String hornetqQueue) throws BusinessTaskException {//NamingException, JMSException { 
        
                          String strMethodName ="startConnection():: ";
        
                          System.out.println(strLogPreffix + strMethodName + "HornetQ Queue Name:: " + hornetqQueue);
        
                          try {
                                    this._ctx = initialContecxt;
        
                                    String queue = "";
                                    if(hornetqQueue!=null) {
                                              //Set HornetQ Queue
                                              queue = "/queue/" + hornetqQueue;
                                              this._queue = (Queue)_ctx.lookup(queue);
        
                                              //Set Connection
                                              _cfConnFactory = (ConnectionFactory) _ctx.lookup("/ThroughputConnectionFactory");
                                              this._conn = _cfConnFactory.createConnection();
        
                                              //Set session
                                              this._session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
                                              _producer = _session.createProducer(_queue); 
                                  _producer.setDisableMessageID(true);
                                  _producer.setDisableMessageTimestamp(true);
                                  
                                  //Start Service
                                  if(_conn!=null)
                                                        _conn.start();
                                              else
                                                        System.err.println("[ HornetQ:: startConnection() ]Error in Connection!");                            
                                       
                                    }
                                    else {
                                              System.err.println("[HornetQ :: startConnection() Queue Name is not provided.]");
                                    } 
                          }
                          catch (Exception e) { 
                                    e.printStackTrace();
                          }
                }
      
      
                public void stopConnection() throws BusinessTaskException {
                          String strMethodName ="stopConnection():: ";
        
                          try {
                                    _ctx.close();
                        _conn.close();            
                        _session.close();
                        _producer.close();
              }catch (NamingException e) {
                        System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          }catch (JMSException e) {
                        System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          }
              
                }
        
                public void enqueueEntity(Object objEntities) throws BusinessTaskException {
             
                          String strMethodName ="enqueueEntity():: ";
        
                          int iMessagePriority = 0;
                          String strCurrentJob = null;
                          if(resourcePath==null || configHT==null) {
                                    resourcePath = "messageserviceinterface_jndi";
                                    configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                          }
                          try {
                                    //System.out.println(strLogPreffix + strMethodName + "objEntities:: "+ objEntities);
                                    if(objEntities!=null) {
                                              MDSMessage mdsMessage = (MDSMessage)objEntities;
                                              strCurrentJob = mdsMessage.getCurrentJob();
                                              if(strCurrentJob!=null)
                                                        iMessagePriority = Integer.parseInt((String)configHT.get(strCurrentJob.trim()));
                                              ObjectMessage objMessage = null;
                                  objMessage = _session.createObjectMessage((Serializable) objEntities);
                                  _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);                           
                                                              
                                    }
                                    else {
                                              System.out.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
                                    }
        
                          }catch (JMSException e) {
                                    // TODO: handle exception
                                    System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          }
                }
        
                public void initMessageCounterSample() {
                          resourcePath = "messageserviceinterface_jndi";
                          configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                          JMX_URL = (String)configHT.get("jmx_url"); 
      
      
                          try {
                                    on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(_queue.getQueueName());
                                    connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap());
                        mbsc = connector.getMBeanServerConnection();
                        _queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc, on, JMSQueueControl.class, false);                  
                        String counters = _queueControl.listMessageCounter();
                        _messageCounter = MessageCounterInfo.fromJSON(counters);   
          
                          }catch (Exception e) {
                                    e.printStackTrace();
                          }
              
                }
        
                public long getMessageCount() throws Exception {
        
                          String counters = null;
        
                          counters = _queueControl.listMessageCounter();
                          _messageCounter = MessageCounterInfo.fromJSON(counters); 
        
                          lFinalCounter = lFinalCounter + _messageCounter.getCountDelta();
        
                          return lFinalCounter;
        
                }
        
        
         private void displayMessageCounter(MessageCounterInfo counter) {
            System.out.println("counter.getName():: " + counter.getName());
            System.out.println("counter.getUdpateTimestamp():: " + counter.getUdpateTimestamp());
            System.out.println("counter.getCount():: " + counter.getCount());
            System.out.println("counter.getCountDelta():: " + counter.getCountDelta());
            System.out.println("counter.getDepth():: " + counter.getDepth());
            System.out.println("counter.getDepthDelta():: " + counter.getDepthDelta());
            System.out.println("counter.getLastAddTimestamp():: " + counter.getLastAddTimestamp());
          } 
          
        
                public void resetMessageCounter() {
                          try { 
                                    _queueControl.resetMessageCounter(); 
                                    String counters = _queueControl.listMessageCounter(); 
                        _messageCounter = MessageCounterInfo.fromJSON(counters);
                        lFinalCounter = 0;
                                    System.out.println("Counter for Queue " + _queue.getQueueName() + " has been reset!"); 
                          } catch (Exception e) {
                                    System.out.println("The Counter Sample cannot be reset until it is initialized. Hint: Call initMessageCounterSample() method!");
                          } 
                }
      }
      

       

      Things are working very fine with given code and configuration. Now, I have setup hornetq clustering for my application. I have tested my application with clustering with 2 nodes. It works fine. It load balances the messages between two nodes in RR Fashion properly. Now, for counter functioning in my application, if counter initiation message is received in node0, it will count messages resided in Queue 'x' deployed in node0. If counter initiation message is received in node1, it will count messages resided in Queue 'x' deployed in node1 only. I.e. if 50 Messages are round robined between node0 and node1 for queue 'x', and if 25 messages are received in queue 'x' deployed on node0 and 25 messages are received in queue 'x' deployed on node1, then the counter will give me only count for either of the queue. I.e. Count=25, while expecting Count=50 for queue 'x'. Can you pls. suggest how can I acheive this collective count for one clustered queue deployed on multiple nodes.

       

      Thanks.

       

      Best Regards,

      Hushen Savani