Should HornetQ JMS code be written inside a POJO or SessionBean?
hushen.savani Jan 11, 2012 9:28 AMHi Community,
I have been using Hornetq-2.2.5 for my middleware application. Application is deployed on JBossAS-5.1.0-GA AppServer (Hornetq embedded in this JBossAS). To send messages in a queue using Hornetq (using JMS Implementation), initially I have used a POJO which starts connection, creates sessions and producers, then it sends the message to the hornetq queue. I.e.
POJO:
public class HornetQ {
private Context _ctx = null;
private Session _session = null;
private Queue _queue = null;
private Connection _conn = null;
private MessageProducer _producer = null;
private String strLogPreffix = "HornetQ";
private JMSQueueControl _queueControl = null;
private MessageCounterInfo _messageCounter = null;
private String resourcePath = null;
private Hashtable configHT = null;
private String JMX_URL = null;
private ObjectName on = null;
private JMXConnector connector = null;
private MBeanServerConnection mbsc = null;
private long lFinalCounter = 0;
@Resource(mappedName = "java:JmsXA")
private ConnectionFactory _cfConnFactory;
public void startConnection(Context initialContecxt, String hornetqQueue) throws BusinessTaskException {//NamingException, JMSException {
String strMethodName ="startConnection():: ";
System.out.println(strLogPreffix + strMethodName + "HornetQ Queue Name:: " + hornetqQueue);
try {
this._ctx = initialContecxt;
String queue = "";
if(hornetqQueue!=null) {
//Set HornetQ Queue
queue = "/queue/" + hornetqQueue;
this._queue = (Queue)_ctx.lookup(queue);
//Set Connection
_cfConnFactory = (ConnectionFactory) _ctx.lookup("/ThroughputConnectionFactory");
this._conn = _cfConnFactory.createConnection();
//Set session
this._session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
_producer = _session.createProducer(_queue);
_producer.setDisableMessageID(true);
_producer.setDisableMessageTimestamp(true);
//Start Service
if(_conn!=null)
_conn.start();
else
System.err.println("[ HornetQ:: startConnection() ]Error in Connection!");
}
else {
System.err.println("[HornetQ :: startConnection() Queue Name is not provided.]");
}
}
catch (Exception e) {
e.printStackTrace();
}
}
public void stopConnection() throws BusinessTaskException {
String strMethodName ="stopConnection():: ";
try {
_ctx.close();
_conn.close();
_session.close();
_producer.close();
}catch (NamingException e) {
System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
e.printStackTrace();
}catch (JMSException e) {
System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
e.printStackTrace();
}
}
public void enqueueEntity(Object objEntities) throws BusinessTaskException {
String strMethodName ="enqueueEntity():: ";
int iMessagePriority = 0;
String strCurrentJob = null;
if(resourcePath==null || configHT==null) {
resourcePath = "messageserviceinterface_jndi";
configHT = JndiConfigUtil.getInstance().getJNDIHT(resourcePath);
}
try {
//System.out.println(strLogPreffix + strMethodName + "objEntities:: "+ objEntities);
if(objEntities!=null) {
MDSMessage mdsMessage = (MDSMessage)objEntities;
strCurrentJob = mdsMessage.getCurrentJob();
if(strCurrentJob!=null)
iMessagePriority = Integer.parseInt((String)configHT.get(strCurrentJob.trim()));
ObjectMessage objMessage = null;
objMessage = _session.createObjectMessage((Serializable) objEntities);
_producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);
}
else {
System.out.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
}
}catch (JMSException e) {
// TODO: handle exception
System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
e.printStackTrace();
}
}
public void initMessageCounterSample() {
resourcePath = "messageserviceinterface_jndi";
configHT = JndiConfigUtil.getInstance().getJNDIHT(resourcePath);
JMX_URL = (String)configHT.get("jmx_url");
try {
on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(_queue.getQueueName());
connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap());
mbsc = connector.getMBeanServerConnection();
_queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc, on, JMSQueueControl.class, false);
String counters = _queueControl.listMessageCounter();
_messageCounter = MessageCounterInfo.fromJSON(counters);
}catch (Exception e) {
e.printStackTrace();
}
}
public long getMessageCount() throws Exception {
String counters = null;
counters = _queueControl.listMessageCounter();
_messageCounter = MessageCounterInfo.fromJSON(counters);
lFinalCounter = lFinalCounter + _messageCounter.getCountDelta();
return lFinalCounter;
}
private void displayMessageCounter(MessageCounterInfo counter) {
System.out.println("counter.getName():: " + counter.getName());
System.out.println("counter.getUdpateTimestamp():: " + counter.getUdpateTimestamp());
System.out.println("counter.getCount():: " + counter.getCount());
System.out.println("counter.getCountDelta():: " + counter.getCountDelta());
System.out.println("counter.getDepth():: " + counter.getDepth());
System.out.println("counter.getDepthDelta():: " + counter.getDepthDelta());
System.out.println("counter.getLastAddTimestamp():: " + counter.getLastAddTimestamp());
}
public void resetMessageCounter() {
try {
_queueControl.resetMessageCounter();
String counters = _queueControl.listMessageCounter();
_messageCounter = MessageCounterInfo.fromJSON(counters);
lFinalCounter = 0;
System.out.println("Counter for Queue " + _queue.getQueueName() + " has been reset!");
} catch (Exception e) {
System.out.println("The Counter Sample cannot be reset until it is initialized. Hint: Call initMessageCounterSample() method!");
}
}
}
I used to call startConnection(), enqueueEntity() then stopConnection() methods in sequence when sending each message. But, application started crashing after heavy load generated on the same. Then I came to know that starting/closing connections/sessions/producers etc on each message are considered to be an anti-pattern if used inside a POJO.
But, according to this wiki article, I came to know that if JMS Code is running inside a JEE Application Server, then switching connections/sessions/producers on each message is not an anti-pattern. Hence, I moved to SessionBean. I.e.:
SessionBean:
@AspectDomain("HornetQSessionBean")
@Stateless(name="HornetQSessionBean")
@Local(HornetQLocal.class)
@Remote(HornetQRemote.class)
@LocalBinding(jndiBinding="HornetQSessionBean")
@RemoteBinding(jndiBinding="HornetQSessionBeanRemote")
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.NOT_SUPPORTED)
@ResourceAdapter("hornetq-ra.rar")
public class HornetQSessionBean implements SessionBean, HornetQRemote, HornetQLocal {
private Queue _queue = null;
private String strLogPreffix = "HornetQ";
private JMSQueueControl _queueControl = null;
private MessageCounterInfo _messageCounter = null;
private String resourcePath = null;
private Hashtable configHT = null;
private String JMX_URL = null;
private ObjectName on = null;
private JMXConnector connector = null;
private MBeanServerConnection mbsc = null;
private long lFinalCounter = 0;
@Resource(mappedName = "java:JmsXA")
ConnectionFactory _cfConnFactory;
@Resource(mappedName = "queue/mocmMasterQueue")
Queue mocmMasterQueue;
@Resource(mappedName = "queue/mocmProcessingQueue")
Queue mocmProcessingQueue;
@Resource(mappedName = "queue/mocmManagementQueue")
Queue mocmManagementQueue;
@Resource(mappedName = "queue/mocmTestQueue")
Queue mocmTestQueue;
@Resource(mappedName = "queue/mocmSuccessQueue")
Queue mocmSuccessQueue;
@Resource(mappedName = "queue/mocmFailureQueue")
Queue mocmFailureQueue;
@Resource(mappedName = "queue/mocmAuditQueue")
Queue mocmAuditQueue;
public void enqueueEntity(String hornetqQueue,Object objEntities) throws BusinessTaskException {
String strMethodName ="enqueueEntity():: ";
int iMessagePriority = 0;
String strCurrentJob = null;
Session _session = null;
Connection _conn = null;
MessageProducer _producer = null;
if(resourcePath==null || configHT==null) {
resourcePath = "messageserviceinterface_jndi";
configHT = JndiConfigUtil.getInstance().getJNDIHT(resourcePath);
}
try {
if(objEntities!=null) {
populateQueue(hornetqQueue);
_conn = _cfConnFactory.createConnection();
//Set session
_session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
_producer = _session.createProducer(_queue);
_producer.setDisableMessageID(true);
_producer.setDisableMessageTimestamp(true);
//Set Message Priority
MDSMessage mdsMessage = (MDSMessage)objEntities;
strCurrentJob = mdsMessage.getCurrentJob();
if(strCurrentJob!=null)
iMessagePriority = Integer.parseInt((String)configHT.get(strCurrentJob.trim()));
ObjectMessage objMessage = null;
objMessage = _session.createObjectMessage((Serializable) objEntities);
//Send Message
_conn.start();
_producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);
}
else {
System.err.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
}
}catch (JMSException e) {
System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
e.printStackTrace();
} finally
{
try {
_conn.close();
_session.close();
_producer.close();
_queue = null;
}
catch (JMSException e) {
e.printStackTrace();
}
}
}
public long getMessageCount(String hornetqQueue) throws Exception {
String counters = _queueControl.listMessageCounter();
_messageCounter = MessageCounterInfo.fromJSON(counters);
lFinalCounter = lFinalCounter + _messageCounter.getCountDelta();
return lFinalCounter;
}
public void resetMessageCounter(String hornetqQueue) {
try {
_queueControl.resetMessageCounter();
String counters = _queueControl.listMessageCounter();
_messageCounter = MessageCounterInfo.fromJSON(counters);
lFinalCounter = 0;
System.out.println("Counter for Queue " + _queue.getQueueName() + " has been reset!");
} catch (Exception e) {
System.out.println("The Counter Sample cannot be reset until it is initialized. Hint: Call initMessageCounterSample(String hornetqQueue) method!");
}
}
private void populateQueue(String hornetqQueue) {
if(hornetqQueue!=null) {
if(hornetqQueue.equalsIgnoreCase("mocmMasterQueue"))
{
_queue = mocmMasterQueue;
}else if(hornetqQueue.equalsIgnoreCase("mocmProcessingQueue"))
{
_queue = mocmProcessingQueue;
}
else if(hornetqQueue.equalsIgnoreCase("mocmManagementQueue"))
{
_queue = mocmManagementQueue;
}else if(hornetqQueue.equalsIgnoreCase("mocmTestQueue"))
{
_queue = mocmTestQueue;
}else if(hornetqQueue.equalsIgnoreCase("mocmSuccessQueue"))
{
_queue = mocmSuccessQueue;
}else if(hornetqQueue.equalsIgnoreCase("mocmFailureQueue"))
{
_queue = mocmFailureQueue;
}else if(hornetqQueue.equalsIgnoreCase("mocmAuditQueue"))
{
_queue = mocmAuditQueue;
}
}
else
System.err.println("Cannot Continue. Please Provide Correct Queue Name!");
}
public void initMessageCounterSample(String hornetqQueue) {
if(resourcePath == null || configHT == null || JMX_URL == null) {
resourcePath = "messageserviceinterface_jndi";
configHT = JndiConfigUtil.getInstance().getJNDIHT(resourcePath);
JMX_URL = (String)configHT.get("jmx_url");
}
populateQueue(hornetqQueue);
try {
on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(_queue.getQueueName());
connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap());
mbsc = connector.getMBeanServerConnection();
_queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc, on, JMSQueueControl.class, false);
String counters = _queueControl.listMessageCounter();
_messageCounter = MessageCounterInfo.fromJSON(counters);
}catch(Exception e) {
e.printStackTrace();
}
}
public void ejbActivate() throws EJBException, RemoteException {
// TODO Auto-generated method stub
}
public void ejbPassivate() throws EJBException, RemoteException {
// TODO Auto-generated method stub
}
public void ejbRemove() throws EJBException, RemoteException {
// TODO Auto-generated method stub
}
public void setSessionContext(SessionContext arg0) throws EJBException,
RemoteException {
// TODO Auto-generated method stub
}
}
I want to discuss, whether migrating to session bean will really help in this case?
Thanks.