4 Replies Latest reply on Oct 18, 2011 1:08 PM by qil.wong

    Is it thread safe when sending messages by a single Session and MessageProducer

    qil.wong

      Hi,

       

      As I know that Session Thread Safe is not in JMS specification, but what I want to do is to create a cached session and producer like this memtioned (http://community.jboss.org/message/144470#144470). I simply create an object to manage jms connection, session, consumer and producer, like bellow:

       

       

      public class MQConnection implements SendAcknowledgementHandler {
      
      
                private String groupIP;
                private int groupPort;
                private HornetQConnectionFactory jmsConnectionFactory;
                private Connection connection1;
                private Session session;
      
      
                private String userName;
                private String password;
      
      
                private String queueTopicName;
      
      
                public static final String QUEUE = "queue";
                public static final String TOPIC = "topic";
      
      
                private String type;
                private MessageListener listener;
      
      
                private Queue queue;
                private Topic topic;
                private boolean send;
      
      
                private MessageConsumer consumer;
                private MessageProducer producer;
      
      
                public MQConnection(String groupIP, int groupPort, String queueTopicName,
                                    String type, boolean send, MessageListener listener,
                                    String username, String password) {
                          this.groupIP = groupIP;
                          this.groupPort = groupPort;
                          this.queueTopicName = queueTopicName;
                          this.type = type;
                          this.listener = listener;
                          this.send = send;
                          this.userName = username;
                          this.password = password;
                }
      
      
                public void createResources() throws JMSException {
                          jmsConnectionFactory = HornetQJMSClient.createConnectionFactoryWithHA(
                                              new DiscoveryGroupConfiguration(groupIP, groupPort),
                                              JMSFactoryType.CF);
                          connection1 = jmsConnectionFactory.createConnection(userName, password);
                          connection1.setExceptionListener(new ExceptionListener() {
      
      
                                    @Override
                                    public void onException(JMSException arg0) {
                                              try {
                                                        createResources();
                                              } catch (JMSException e) {
                                                        PSTLogger.logExceptionTrace(e);
                                              }
                                    }
                          });
                          connection1.start();
                          session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
                          if (listener != null) {
                                    session.setMessageListener(listener);
                          }
                          if (QUEUE.equals(type)) {
                                    queue = HornetQJMSClient.createQueue(queueTopicName);
                          }
                          if (TOPIC.equals(type)) {
                                    topic = HornetQJMSClient.createTopic(queueTopicName);
                          }
      
      
                          if (send) {
                                    if (queue != null) {
                                              producer = session.createProducer(queue);
                                    }
                                    if (topic != null) {
                                              producer = session.createProducer(topic);
                                    }
                                    ClientSession coreSession = ((HornetQSession) session)
                                                        .getCoreSession();
                                    coreSession.setSendAcknowledgementHandler(this);
                          } else {
                                    if (queue != null) {
                                              consumer = session.createConsumer(queue);
                                    }
                                    if (topic != null) {
                                              consumer = session.createConsumer(topic);
                                    }
                                    if (listener != null) {
                                              consumer.setMessageListener(listener);
                                    }
                          }
                }
      
             public void sendAcknowledged(Message message){
                //Do nothing
             }
      
      
                public void send(Serializable o) throws JMSException {
                          ObjectMessage message = session.createObjectMessage(o);
                          producer.send(message);
                }
      }
      

       

      I can make sure that once this object is created, all attributes and objects in it are read only, and they won't be updated elsewhere.

       

      The point is, is it thread safe when I using method send(Serializable o)? Or do I need to synchronize this method?

       

      Thanks!

       

      Chet