10 Replies Latest reply on Sep 17, 2009 3:29 PM by Toby Morris

    JMS connection/session handling

    Bob Damato Newbie

      What is the proper way to manage JMS connections and sessions?

      Should I always create a connection, create a session, DO THE WORK, then close the session, and the connection. Or, can I keep the connection and session open indefinitely and simply re-use them for as long as I want.

      We've been trying to go with the re-use the session approach and for the most part, it works. It seems, however, that if the queue goes away (eg. if jboss is restarted), the client session running on a remote machine, never sees the queue go away or doesn't receive any sort of exception. So, when the queue does come back online, it appears that the session is disconnected and doesn't know it. It never resumes retrieving messages from the queue.

      I've looked around at many tutorials, but they all show the simplistic approach of creating a new connection and session each time. Does JBoss have a recommended method? Is it inappropriate to re-use the connection and session?

        • 1. Re: JMS connection/session handling
          Bob Damato Newbie

          This thread pretty much what I was asking. Unfortunately there were no answers:
          http://www.jboss.org/index.html?module=bb&op=viewtopic&p=3890876#3890876

          • 2. Re: JMS connection/session handling
            Yong Hao Gao Master

            Connection pooling is preferred. Session should not be pooled.

            • 3. Re: JMS connection/session handling
              Tim Fox Master

              That is not correct.

              It is an *anti pattern* to create a new session/consumer/producer each time you say, send a message.

              • 4. Re: JMS connection/session handling
                Tim Fox Master

                I'm going to wite a wiki page on this. This question must have come up about 100000 times and I'm getting tired of answering it ;)

                • 5. Re: JMS connection/session handling
                  Yong Hao Gao Master

                  That's great.

                  • 6. Re: JMS connection/session handling
                    Bob Damato Newbie

                    Is there connection pooling built in, or is it something I'd have to create myself? I'm looking forward to the wiki.

                    • 7. Re: JMS connection/session handling
                      Peter Johnson Master

                       

                      I've looked around at many tutorials, but they all show the simplistic approach of creating a new connection and session each time.


                      I know of one example that doesn't do this - it creates the connection, session and producer up front and uses that one producer to send an arbitrary number of messages. When it is done, it closes everything.

                      (I hope that is correct, otherwise I will fix it right away.)

                      • 8. Re: JMS connection/session handling
                        Bob Damato Newbie

                        I've tried something like that, and I'm not sure whether it's a bug or not, but it doesn't seem to work well when the source queue or source queue's jboss instance goes away and comes back (eg during a restart). We're working on a small test case to see if we can reproduce in a simple way.

                        • 9. Re: JMS connection/session handling
                          Tim Fox Master

                          I've written a wiki article on this subject, on the HornetQ wiki:

                          http://www.jboss.org/community/wiki/ShouldIcacheJMSconnectionsandJMSsessions

                          • 10. Re: JMS connection/session handling
                            Toby Morris Newbie

                            Cheers. I'm working with Bob on this issue, and we've found something when keeping open connections, sessions, consumers in our receiver. Below is the code for our sender and our receiver. The receiver continues to check the JMS queue when the queue is stopped and started, and also when the JBoss server is shutdown and restarted. On a queue stop/start, the receiver continues to act as if it's actually trying to get messages. It does not error. When the queue comes back up, the receiver will receive any new messages that arrive after the restart. On a JBoss restart, the receiver does pretty much the same, with no notification of the loss of connectivity, and upon restart, the receiver does NOT receive any new messages that arrive after the JBoss restart. Any ideas on how we can make the receiver realize the server is down so it can handle reconnecting?


                            Sender code:

                            package com.valpak.renderingservice.indesign.unittests;
                            
                            import java.util.Properties;
                            
                            import javax.jms.DeliveryMode;
                            import javax.jms.JMSException;
                            import javax.jms.ObjectMessage;
                            import javax.jms.Queue;
                            import javax.jms.QueueConnection;
                            import javax.jms.QueueConnectionFactory;
                            import javax.jms.QueueReceiver;
                            import javax.jms.QueueSender;
                            import javax.jms.QueueSession;
                            import javax.naming.Context;
                            import javax.naming.InitialContext;
                            import javax.naming.NamingException;
                            
                            import org.apache.log4j.Logger;
                            
                            public class JMSSender {
                             private static Logger log = Logger.getLogger(JMSReceiver.class);
                             private String jmsServer = <jms queue server URL;
                             private String jmsQueue = <queue name>;
                            
                             public static void main(String[] args) {
                             JMSSender jmss = new JMSSender();
                             try {
                             while (true) {
                             log.debug("Sending a message");
                             jmss.sendMessage();
                             Thread.sleep(10000);
                             }
                             } catch (InterruptedException e) {
                             e.printStackTrace();
                             }
                            
                             }
                            
                             private void sendMessage() {
                             InitialContext ctx = null;
                             QueueConnection conn = null;
                             QueueConnectionFactory tcf = null;
                             Queue queue = null;
                             QueueSession session = null;
                             QueueSender sender = null;
                             QueueReceiver receiver = null;
                             Properties properties = new Properties();
                             int replyWaitMS = 3000;
                             ObjectMessage message = null;
                            
                             try {
                             properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
                             properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
                             properties.put(Context.PROVIDER_URL, jmsServer);
                             ctx = new InitialContext(properties);
                             tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
                             conn = tcf.createQueueConnection();
                             queue = (Queue) ctx.lookup(jmsQueue);
                             conn.start();
                             session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                             sender = session.createSender(queue);
                             sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                             String messageBody = "You're an idiot!";
                             message = session.createObjectMessage(messageBody);
                             Queue tempQueue = session.createTemporaryQueue();
                             if (tempQueue != null) {
                             message.setJMSReplyTo(tempQueue);
                             sender.setTimeToLive(replyWaitMS * 20);
                             } else {
                             sender.setTimeToLive(60 * 60 * 1000); // 60 minutes
                             }
                             sender.send(message);
                            
                             if (tempQueue != null) {
                             receiver = session.createReceiver(tempQueue);
                             Object responseMessage = receiver.receive(replyWaitMS);
                             if (responseMessage != null) {
                             ObjectMessage response = (ObjectMessage) responseMessage;
                             Object responseObject = response.getObject();
                             if (responseObject instanceof String) {
                             log.debug(responseObject);
                             }
                             }
                             }
                             } catch (NamingException e) {
                             e.printStackTrace();
                             } catch (JMSException e) {
                             e.printStackTrace();
                             } finally {
                             close(conn, session, receiver, sender);
                             }
                             }
                            
                             private void close(QueueConnection conn, QueueSession session, QueueReceiver receiver, QueueSender sender) {
                             try {
                             if (conn != null) {
                             conn.close();
                             }
                             } catch (JMSException e) {
                             e.printStackTrace();
                             }
                             try {
                             if (session != null) {
                             session.close();
                             }
                             } catch (JMSException e) {
                             e.printStackTrace();
                             }
                             try {
                             if (receiver != null) {
                             receiver.close();
                             }
                             } catch (JMSException e) {
                             e.printStackTrace();
                             }
                             try {
                             if (sender != null) {
                             sender.close();
                             }
                             } catch (JMSException e) {
                             e.printStackTrace();
                             }
                             }
                            }
                            


                            And the receiver code:
                            package com.valpak.renderingservice.indesign.unittests;
                            
                            import java.io.Serializable;
                            import java.util.Properties;
                            
                            import javax.jms.DeliveryMode;
                            import javax.jms.Destination;
                            import javax.jms.JMSException;
                            import javax.jms.MessageProducer;
                            import javax.jms.ObjectMessage;
                            import javax.jms.Queue;
                            import javax.jms.QueueConnection;
                            import javax.jms.QueueConnectionFactory;
                            import javax.jms.QueueReceiver;
                            import javax.jms.QueueSession;
                            import javax.naming.Context;
                            import javax.naming.InitialContext;
                            import javax.naming.NamingException;
                            
                            import org.apache.log4j.Logger;
                            
                            public class JMSReceiver {
                             private static Logger log = Logger.getLogger(JMSReceiver.class);
                             private String jmsServer = <jms server URL>;
                             private String jmsQueue = <queue name>;
                             private int queueWaitSeconds = 3;
                             InitialContext ctx = null;
                             QueueConnection conn = null;
                             QueueConnectionFactory tcf = null;
                             Queue queue = null;
                             QueueSession session = null;
                             QueueReceiver consumer = null;
                            
                             public static void main(String[] args) {
                             // Check for messages
                             JMSReceiver jmsr = new JMSReceiver();
                             try {
                             jmsr.initialize();
                             while (true) {
                             ObjectMessage message = jmsr.checkQueue();
                             if (message != null) {
                             if (message.getObject() instanceof String) {
                             // Output what we got
                             log.debug(message.getObject());
                             // Send a reply
                             String reply = "I know you are, but what am I?";
                             jmsr.sendReplyMessage(message.getJMSReplyTo(), reply);
                             }
                             } else {
                             log.debug("No message. Sleeping");
                             Thread.sleep(6000);
                             }
                             }
                             } catch (Exception e) {
                             e.printStackTrace();
                             }
                            
                             }
                            
                             private void initialize() throws Exception {
                             Properties properties = new Properties();
                             properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
                             properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
                             properties.put(Context.PROVIDER_URL, jmsServer);
                             ctx = new InitialContext(properties);
                             tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
                             conn = tcf.createQueueConnection();
                             queue = (Queue) ctx.lookup(jmsQueue);
                             conn.start();
                             session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                             consumer = session.createReceiver(queue);
                             }
                            
                             private ObjectMessage checkQueue() {
                            
                             try {
                             ObjectMessage message = (ObjectMessage) consumer.receive(queueWaitSeconds * 1000);
                             if (message != null) {
                             log.debug("got a message");
                             return message;
                             } else {
                             log.debug("No message found for " + queueWaitSeconds + " seconds");
                             return null;
                             }
                             } catch (Exception e) {
                             e.printStackTrace();
                             } finally {
                            
                             }
                             return null;
                             }
                            
                             private void close(QueueConnection conn, QueueSession session, QueueReceiver receiver, MessageProducer producer) {
                             try {
                             if (conn != null) {
                             conn.close();
                             }
                             } catch (JMSException e) {
                             e.printStackTrace();
                             }
                             try {
                             if (session != null) {
                             session.close();
                             }
                             } catch (JMSException e) {
                             e.printStackTrace();
                             }
                             try {
                             if (receiver != null) {
                             receiver.close();
                             }
                             } catch (JMSException e) {
                             e.printStackTrace();
                             }
                             try {
                             if (producer != null) {
                             producer.close();
                             }
                             } catch (JMSException e) {
                             e.printStackTrace();
                             }
                             }
                            
                             private void sendReplyMessage(Destination replyTo, Serializable reply) {
                             InitialContext ctx = null;
                             QueueConnection conn = null;
                             QueueConnectionFactory tcf = null;
                             QueueSession session = null;
                             MessageProducer producer = null;
                            
                             Properties properties = new Properties();
                             try {
                             properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
                             properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
                             properties.put(Context.PROVIDER_URL, jmsServer);
                             ctx = new InitialContext(properties);
                             tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
                             conn = tcf.createQueueConnection();
                             conn.start();
                             session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                             producer = session.createProducer(replyTo);
                             ObjectMessage response = session.createObjectMessage(reply);
                             response.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
                             producer.send(response);
                             } catch (NamingException e) {
                             e.printStackTrace();
                             } catch (JMSException e) {
                             e.printStackTrace();
                             } finally {
                             close(conn, session, null, producer);
                             }
                             }
                            
                            }