4 Replies Latest reply on Jan 12, 2012 3:13 AM by hushen.savani

    Should HornetQ JMS code be written inside a POJO or SessionBean?

    hushen.savani

      Hi Community,

       

           I have been using Hornetq-2.2.5 for my middleware application. Application is deployed on JBossAS-5.1.0-GA AppServer (Hornetq embedded in this JBossAS).  To send messages in a queue using Hornetq (using JMS Implementation), initially I have used a POJO which starts connection, creates sessions and producers, then it sends the message to the hornetq queue. I.e.

       

      POJO:

       

      public class HornetQ {
        
                private Context _ctx = null;
                private Session _session = null;
                private Queue _queue = null;
                private Connection _conn = null;
                private MessageProducer _producer = null;
                private String strLogPreffix = "HornetQ";
                private JMSQueueControl _queueControl = null;
                private MessageCounterInfo _messageCounter = null;
                private String resourcePath = null;
                private Hashtable configHT =  null; 
                private String JMX_URL = null; 
                private ObjectName on = null;
                private JMXConnector connector = null;
                private MBeanServerConnection mbsc = null;
                private long lFinalCounter = 0; 
        
                @Resource(mappedName = "java:JmsXA")
                private ConnectionFactory _cfConnFactory;
        
      
      
                public void startConnection(Context initialContecxt, String hornetqQueue) throws BusinessTaskException {//NamingException, JMSException { 
        
                          String strMethodName ="startConnection():: ";
        
                          System.out.println(strLogPreffix + strMethodName + "HornetQ Queue Name:: " + hornetqQueue);
        
                          try {
                                    this._ctx = initialContecxt;
        
                                    String queue = "";
                                    if(hornetqQueue!=null) {
                                              //Set HornetQ Queue
                                              queue = "/queue/" + hornetqQueue;
                                              this._queue = (Queue)_ctx.lookup(queue);
        
                                              //Set Connection
                                              _cfConnFactory = (ConnectionFactory) _ctx.lookup("/ThroughputConnectionFactory");
                                              this._conn = _cfConnFactory.createConnection();
        
                                              //Set session
                                              this._session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
                                              _producer = _session.createProducer(_queue); 
                                  _producer.setDisableMessageID(true);
                                  _producer.setDisableMessageTimestamp(true);
                                  
                                  //Start Service
                                  if(_conn!=null)
                                                        _conn.start();
                                              else
                                                        System.err.println("[ HornetQ:: startConnection() ]Error in Connection!");                            
                                       
                                    }
                                    else {
                                              System.err.println("[HornetQ :: startConnection() Queue Name is not provided.]");
                                    } 
                          }
                          catch (Exception e) { 
                                    e.printStackTrace();
                          }
                }
      
      
                public void stopConnection() throws BusinessTaskException {
                          String strMethodName ="stopConnection():: ";
        
                          try {
                                    _ctx.close();
                        _conn.close();            
                        _session.close();
                        _producer.close();
              }catch (NamingException e) {
                        System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          }catch (JMSException e) {
                        System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          }
              
                }
        
                public void enqueueEntity(Object objEntities) throws BusinessTaskException {
             
                          String strMethodName ="enqueueEntity():: ";
        
                          int iMessagePriority = 0;
                          String strCurrentJob = null;
                          if(resourcePath==null || configHT==null) {
                                    resourcePath = "messageserviceinterface_jndi";
                                    configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                          }
                          try {
                                    //System.out.println(strLogPreffix + strMethodName + "objEntities:: "+ objEntities);
                                    if(objEntities!=null) {
                                              MDSMessage mdsMessage = (MDSMessage)objEntities;
                                              strCurrentJob = mdsMessage.getCurrentJob();
                                              if(strCurrentJob!=null)
                                                        iMessagePriority = Integer.parseInt((String)configHT.get(strCurrentJob.trim()));
                                              ObjectMessage objMessage = null;
                                  objMessage = _session.createObjectMessage((Serializable) objEntities);
                                  _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);                           
                                                              
                                    }
                                    else {
                                              System.out.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
                                    }
        
                          }catch (JMSException e) {
                                    // TODO: handle exception
                                    System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          }
                }
        
                public void initMessageCounterSample() {
                          resourcePath = "messageserviceinterface_jndi";
                          configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                          JMX_URL = (String)configHT.get("jmx_url"); 
      
      
                          try {
                                    on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(_queue.getQueueName());
                                    connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap());
                        mbsc = connector.getMBeanServerConnection();
                        _queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc, on, JMSQueueControl.class, false);                  
                        String counters = _queueControl.listMessageCounter();
                        _messageCounter = MessageCounterInfo.fromJSON(counters);   
          
                          }catch (Exception e) {
                                    e.printStackTrace();
                          }
              
                }
        
                public long getMessageCount() throws Exception {
        
                          String counters = null;
        
                          counters = _queueControl.listMessageCounter();
                          _messageCounter = MessageCounterInfo.fromJSON(counters); 
        
                          lFinalCounter = lFinalCounter + _messageCounter.getCountDelta();
        
                          return lFinalCounter;
        
                }
        
        
         private void displayMessageCounter(MessageCounterInfo counter) {
            System.out.println("counter.getName():: " + counter.getName());
            System.out.println("counter.getUdpateTimestamp():: " + counter.getUdpateTimestamp());
            System.out.println("counter.getCount():: " + counter.getCount());
            System.out.println("counter.getCountDelta():: " + counter.getCountDelta());
            System.out.println("counter.getDepth():: " + counter.getDepth());
            System.out.println("counter.getDepthDelta():: " + counter.getDepthDelta());
            System.out.println("counter.getLastAddTimestamp():: " + counter.getLastAddTimestamp());
          } 
          
                public void resetMessageCounter() {
                          try { 
                                    _queueControl.resetMessageCounter(); 
                                    String counters = _queueControl.listMessageCounter(); 
                        _messageCounter = MessageCounterInfo.fromJSON(counters);
                        lFinalCounter = 0;
                                    System.out.println("Counter for Queue " + _queue.getQueueName() + " has been reset!"); 
                          } catch (Exception e) {
                                    System.out.println("The Counter Sample cannot be reset until it is initialized. Hint: Call initMessageCounterSample() method!");
                          } 
                }
      }
      

       

      I used to call startConnection(), enqueueEntity() then stopConnection() methods in sequence when sending each message. But, application started crashing after heavy load generated on the same. Then I came to know that starting/closing connections/sessions/producers etc on each message are considered to be an anti-pattern if used inside a POJO.

       

      But, according to this wiki article, I came to know that if JMS Code is running inside a JEE Application Server, then switching connections/sessions/producers on each message is not an anti-pattern. Hence, I moved to SessionBean. I.e.:

       

      SessionBean:

       

      @AspectDomain("HornetQSessionBean")
      @Stateless(name="HornetQSessionBean")
      @Local(HornetQLocal.class)
      @Remote(HornetQRemote.class)
      @LocalBinding(jndiBinding="HornetQSessionBean")
      @RemoteBinding(jndiBinding="HornetQSessionBeanRemote")
      @TransactionManagement(value= TransactionManagementType.CONTAINER)
      @TransactionAttribute(value= TransactionAttributeType.NOT_SUPPORTED)
      @ResourceAdapter("hornetq-ra.rar")
      public class HornetQSessionBean implements SessionBean, HornetQRemote, HornetQLocal { 
        
                private Queue _queue = null;
                private String strLogPreffix = "HornetQ";
                private JMSQueueControl _queueControl = null;
                private MessageCounterInfo _messageCounter = null;
                private String resourcePath = null;
                private Hashtable configHT =  null; 
                private String JMX_URL = null; 
                private ObjectName on = null;
                private JMXConnector connector = null;
                private MBeanServerConnection mbsc = null;
                private long lFinalCounter = 0; 
        
                @Resource(mappedName = "java:JmsXA")
                ConnectionFactory _cfConnFactory; 
        
                 @Resource(mappedName = "queue/mocmMasterQueue")
                 Queue mocmMasterQueue; 
        
                 @Resource(mappedName = "queue/mocmProcessingQueue")
                 Queue mocmProcessingQueue; 
        
                 @Resource(mappedName = "queue/mocmManagementQueue")
                 Queue mocmManagementQueue; 
                 
                 @Resource(mappedName = "queue/mocmTestQueue")
                 Queue mocmTestQueue;
                 
                 @Resource(mappedName = "queue/mocmSuccessQueue")
                 Queue mocmSuccessQueue;
                 
                 @Resource(mappedName = "queue/mocmFailureQueue")
                 Queue mocmFailureQueue;
                 
                 @Resource(mappedName = "queue/mocmAuditQueue")
                 Queue mocmAuditQueue; 
      
      
                public void enqueueEntity(String hornetqQueue,Object objEntities) throws BusinessTaskException {
                       
                          String strMethodName ="enqueueEntity():: ";
        
                          int iMessagePriority = 0;
                          String strCurrentJob = null;
        
                          Session _session = null; 
                          Connection _conn = null;
                          MessageProducer _producer = null;
        
                          if(resourcePath==null || configHT==null) {
                                    resourcePath = "messageserviceinterface_jndi";
                                    configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                          }
        
                          try {
        
                                    if(objEntities!=null) {
        
                                              populateQueue(hornetqQueue);
        
                                              _conn = _cfConnFactory.createConnection();
        
                                              //Set session
                                              _session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                  _producer = _session.createProducer(_queue); 
                                  _producer.setDisableMessageID(true);
                                  _producer.setDisableMessageTimestamp(true);                                      
                                     
                                  //Set Message Priority
                                              MDSMessage mdsMessage = (MDSMessage)objEntities;
                                              strCurrentJob = mdsMessage.getCurrentJob();
                                              if(strCurrentJob!=null)
                                                        iMessagePriority = Integer.parseInt((String)configHT.get(strCurrentJob.trim()));
                                              ObjectMessage objMessage = null;
                                  objMessage = _session.createObjectMessage((Serializable) objEntities);
                                  
                                  //Send Message
                                  _conn.start();
                                  _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);
                                    }
                                    else {
                                              System.err.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
                                    }
        
                          }catch (JMSException e) {
                                    System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                                    e.printStackTrace();
                          } finally
                      {
                         try {
                            _conn.close();
                      _session.close();
                            _producer.close();             
                            _queue = null;                      
                         }
                  catch (JMSException e) {
                            e.printStackTrace();
                  }            
               }               
                } 
      
      
                public long getMessageCount(String hornetqQueue) throws Exception {
        
                          String counters = _queueControl.listMessageCounter();
                          _messageCounter = MessageCounterInfo.fromJSON(counters); 
        
                          lFinalCounter = lFinalCounter + _messageCounter.getCountDelta();
        
                          return lFinalCounter;
        
                } 
        
                public void resetMessageCounter(String hornetqQueue) {
        
                          try {
                                    _queueControl.resetMessageCounter(); 
                                    String counters = _queueControl.listMessageCounter(); 
                        _messageCounter = MessageCounterInfo.fromJSON(counters);
                        lFinalCounter = 0;
                                    System.out.println("Counter for Queue " + _queue.getQueueName() + " has been reset!"); 
                          } catch (Exception e) {
                                    System.out.println("The Counter Sample cannot be reset until it is initialized. Hint: Call initMessageCounterSample(String hornetqQueue) method!");
                          } 
                }
        
                private void populateQueue(String hornetqQueue) {
        
                          if(hornetqQueue!=null) { 
        
                                    if(hornetqQueue.equalsIgnoreCase("mocmMasterQueue"))
                                    {
                                              _queue = mocmMasterQueue;
                                    }else if(hornetqQueue.equalsIgnoreCase("mocmProcessingQueue"))
                                    {
                                              _queue = mocmProcessingQueue;
                                    }
                                    else if(hornetqQueue.equalsIgnoreCase("mocmManagementQueue"))
                                    {
                                              _queue = mocmManagementQueue;
                                    }else if(hornetqQueue.equalsIgnoreCase("mocmTestQueue"))
                                    {
                                              _queue = mocmTestQueue;
                                    }else if(hornetqQueue.equalsIgnoreCase("mocmSuccessQueue"))
                                    {
                                              _queue = mocmSuccessQueue;
                                    }else if(hornetqQueue.equalsIgnoreCase("mocmFailureQueue"))
                                    {
                                              _queue = mocmFailureQueue;
                                    }else if(hornetqQueue.equalsIgnoreCase("mocmAuditQueue"))
                                    {
                                              _queue = mocmAuditQueue;
                                    } 
                          }
                          else
                                    System.err.println("Cannot Continue. Please Provide Correct Queue Name!");
                }
        
                public void initMessageCounterSample(String hornetqQueue) {
        
                          if(resourcePath == null || configHT == null || JMX_URL == null) { 
                                    resourcePath = "messageserviceinterface_jndi";
                                    configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                                    JMX_URL = (String)configHT.get("jmx_url");
                          }
        
                          populateQueue(hornetqQueue); 
        
                          try {
                                    on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(_queue.getQueueName());
                                    connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap());
                        mbsc = connector.getMBeanServerConnection();
                        _queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc, on, JMSQueueControl.class, false);
                        String counters = _queueControl.listMessageCounter();
                        _messageCounter = MessageCounterInfo.fromJSON(counters); 
                          }catch(Exception e) {
                                    e.printStackTrace();
                          }       
                }
        
                public void ejbActivate() throws EJBException, RemoteException {
                          // TODO Auto-generated method stub
        
                }
      
      
                public void ejbPassivate() throws EJBException, RemoteException {
                          // TODO Auto-generated method stub
        
                }
      
      
                public void ejbRemove() throws EJBException, RemoteException {
                          // TODO Auto-generated method stub
        
                }
      
      
                public void setSessionContext(SessionContext arg0) throws EJBException,
                                    RemoteException {
                          // TODO Auto-generated method stub
        
                }
      }
      

       

      I want to discuss, whether migrating to session bean will really help in this case?

       

      Thanks.