2 Replies Latest reply on Aug 21, 2005 8:53 PM by javajedi

    JMS XA - Am I doing something wrong?

      JBoss 4.0.1
      JDK 1.5.0_02

      I have code that is running inside of a JTA transaction. I get a JMS connection factory by looking up "java:/JmsXA" from JNDI. I create a JMS session and send a message to a queue. My understanding was that the JMS XA resource adapter would wait until the JTA transaction is committed until it actually sends the message. However, I have a remote machine that is listening to the queue, and it receives the message before I commit my transaction. Maybe I'm doing something wrong, or maybe I'm not understanding how the resource adapter is supposed to work? Does it matter what parameters I pass to createSession() on the JMS connection? From what I've been able to figure out based on the sparse documentation, it doesn't. If it does, when should I call commit() on my JMS Session? If I'm just doing something wrong, I would really appreciate a simple example of getting a JMS connection and sending a message within the context of a JTA transaction. Thanks for any help anyone out there can provide.

      --Tim

        • 1. Re: JMS XA - Am I doing something wrong?

          I modified HAJMSClient.java from the JBoss examples to use JmsXA and JTA transactions to more clearly demonstrate what doesn't seem to be working. I'm attaching the code. I deploy this MBean and invoke the following operations:

          connect()
          startTransaction()
          sendMessageToQueue()
          *** At this point, a "Message received" line appears in the server log.
          endTransaction()


          The message shouldn't be received until AFTER endTransaction() has been called, right? Is this a bug, or am I doing something wrong?

          • 2. Re: JMS XA - Am I doing something wrong?

             

            package org.jboss.mq.il.ha.examples;
            
            import java.util.Properties;
            
            import javax.jms.ExceptionListener;
            import javax.jms.JMSException;
            import javax.jms.Message;
            import javax.jms.MessageListener;
            import javax.jms.Queue;
            import javax.jms.QueueConnection;
            import javax.jms.QueueConnectionFactory;
            import javax.jms.QueueReceiver;
            import javax.jms.QueueSender;
            import javax.jms.QueueSession;
            import javax.jms.TextMessage;
            import javax.jms.Topic;
            import javax.jms.TopicConnection;
            import javax.jms.TopicConnectionFactory;
            import javax.jms.TopicPublisher;
            import javax.jms.TopicSession;
            import javax.jms.TopicSubscriber;
            import javax.naming.Context;
            import javax.naming.InitialContext;
            import javax.naming.NamingException;
            import javax.transaction.*;
            
            import org.jboss.logging.Logger;
            import org.jboss.system.ServiceMBeanSupport;
            
            /**
             *
             * Helps to manually test the HAIL
             *
             * @author Ivelin Ivanov <ivelin@apache.org>
             *
             */
            public class HAJMSClient2 extends ServiceMBeanSupport
             implements MessageListener, ExceptionListener, HAJMSClient2MBean
            {
            
             private Transaction _tx;
            
             /**
             * Acknowledges connenction exception.
             * Should be invoked every time the HAIL singleton moves.
             */
             public void onException(JMSException connEx)
             {
             log.info("Notification received by ExceptionListener. Singleton Probably Moved.");
             try
             {
             disconnect();
             }
             catch (Exception e)
             {
             e.printStackTrace();
             }
             finally
             {
             connectionException_ = connEx;
             }
             }
            
             public void startTransaction() throws Exception {
             TransactionManager tm = getMgr();
             tm.begin();
             _tx = tm.getTransaction();
             tm.suspend();
             }
            
             public void endTransaction() throws Exception {
             TransactionManager tm = getMgr();
             tm.resume(_tx);
             tm.commit();
             _tx = null;
             }
            
             private TransactionManager getMgr() throws Exception {
             return (TransactionManager) new InitialContext().lookup("java:/TransactionManager");
             }
            
             public void connect() throws NamingException, JMSException
             {
             log.info("Connecting");
            
            
             Properties p = new Properties();
             p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
             p.put(Context.URL_PKG_PREFIXES, "jboss.naming:org.jnp.interfaces");
             p.put(Context.PROVIDER_URL, "localhost:1100"); // HA-JNDI port.
             InitialContext iniCtx = new InitialContext();
            
             Object tmpConnFactory = iniCtx.lookup("java:/JmsXA");
            
             TopicConnectionFactory tcf = (TopicConnectionFactory)tmpConnFactory;
             topicConn_ = tcf.createTopicConnection();
             topic_ = (Topic)iniCtx.lookup("topic/testTopic");
             topicSession_ = topicConn_.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
            
             // The commented out code can be used to test non-durable subscriptions
             //topicSub_ = topicSession_.createSubscriber(topic_);
             //topicSub_.setMessageListener( this );
            
             topicPub_ = topicSession_.createPublisher(topic_);
             topicConn_.start();
            
             QueueConnectionFactory qcf = (QueueConnectionFactory)tmpConnFactory;
             qConn_ = qcf.createQueueConnection(JMS_USERNAME, JMS_PASSWORD);
             q_ = (Queue)iniCtx.lookup("queue/testQueue");
             qSession_ = qConn_.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
             qRecv_ = qSession_.createReceiver(q_);
             qRecv_.setMessageListener( this );
             qSend_ = qSession_.createSender(q_);
             qConn_.start();
            
             log.info("Connected");
             }
            
             public void disconnect() throws JMSException
             {
             if (topicConn_ == null) return;
            
             log.info("Disconnecting");
            
             connectionException_ = null;
            
             try
             {
            
             //topicSub_.close();
            
             topicPub_.close();
             topicConn_.stop();
             topicSession_.close();
            
             if (durTopicConn_ != null)
             {
             durTopicConn_.stop();
             durTopicSession_.close();
             }
            
             qRecv_.close();
            
             qSend_.close();
             qConn_.stop();
             qSession_.close();
             }
             finally
             {
             try
             {
             topicConn_.close();
             if (durTopicConn_ != null) durTopicConn_.close();
             }
             finally
             {
             topicConn_ = null;
             durTopicConn_ = null;
             try
             {
             qConn_.close();
             }
             finally
             {
             qConn_ = null;
             }
             }
            
             }
             log.info("Disconnected");
             }
            
            
             public void registerDurableSubscriberAndDisconnect() throws JMSException, NamingException
             {
             log.info("Registering durable subscriber");
            
             InitialContext iniCtx = new InitialContext();
             Object tmp = iniCtx.lookup("XAConnectionFactory");
            
             TopicConnectionFactory tcf = (TopicConnectionFactory)tmp;
             TopicConnection topicConn = tcf.createTopicConnection(JMS_USERNAME, JMS_PASSWORD);
             topicConn.setClientID("durSubTestClient");
             Topic topic = (Topic)iniCtx.lookup("topic/testTopic");
             TopicSession topicSession = topicConn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
             TopicSubscriber topicSub = topicSession.createDurableSubscriber(topic, "durableSub");
             topicConn.start();
            
             topicSub.close();
             topicSession.close();
             topicConn.close();
            
             log.info("Registered durable subscriber 'durableSub' and disconnected. Messages for this subscriber will be stored by the JMS server until it becomes available.");
             }
            
            
             public void registerDurableSubscriberAndReceiveMessages() throws JMSException, NamingException
             {
             log.info("Registering durable subscriber");
            
             InitialContext iniCtx = new InitialContext();
             Object tmp = iniCtx.lookup("XAConnectionFactory");
            
             TopicConnectionFactory tcf = (TopicConnectionFactory)tmp;
             durTopicConn_= tcf.createTopicConnection(JMS_USERNAME, JMS_PASSWORD);
             durTopicConn_.setClientID("durSubTestClient");
             Topic topic = (Topic)iniCtx.lookup("topic/testTopic");
             durTopicSession_ = durTopicConn_.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
             TopicSubscriber topicSub = durTopicSession_.createDurableSubscriber(topic, "durableSub");
             topicSub.setMessageListener( this );
             durTopicConn_.start();
            
             log.info("Registered durable subscriber 'durableSub' and waiting to receive messages.");
             }
            
             /**
             * Handle JMS message
             *
             * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
             */
             public void onMessage(Message msg)
             {
             lastMessage_ = (TextMessage)msg;
             log.info("Message received: " + msg);
             }
            
             public String getLastMessage() throws JMSException
             {
             if (lastMessage_ == null) return null;
             return lastMessage_.getText();
             }
            
             public String getConnectionException()
             {
             if (connectionException_ == null) return null;
             return connectionException_.toString();
             }
            
             public void publishMessageToTopic(String text) throws Exception
             {
             TransactionManager tm = getMgr();
             tm.resume(_tx);
             TextMessage msg = topicSession_.createTextMessage(text);
             topicPub_.publish(msg);
             log.info("HA JMS message published to topic: " + text);
             tm.suspend();
             }
            
             public void sendMessageToQueue(String text) throws Exception
             {
             TransactionManager tm = getMgr();
             tm.resume(_tx);
             TextMessage msg = qSession_.createTextMessage(text);
             qSend_.send(msg);
             log.info("HA JMS message sent to queue: " + text);
             tm.suspend();
             }
            
             private Topic topic_;
             private TopicConnection topicConn_;
             private TopicSession topicSession_;
             private JMSException connectionException_;
            
             private TopicConnection durTopicConn_;
             private TopicSession durTopicSession_;
             //private TopicSubscriber topicSub_;
             private TopicPublisher topicPub_;
             private TextMessage lastMessage_;
            
             private Queue q_;
             private QueueConnection qConn_;
             private QueueSession qSession_;
             private QueueReceiver qRecv_;
             private QueueSender qSend_;
            
             private static String JMS_USERNAME = "dynsub";
             private static String JMS_PASSWORD = "dynsub";
            
            
             private static Logger log = Logger.getLogger(HAJMSClient2.class);
            
            }