This content has been marked as final.
Show 2 replies
-
1. Re: JMS XA - Am I doing something wrong?
javajedi Aug 21, 2005 8:50 PM (in response to javajedi)I modified HAJMSClient.java from the JBoss examples to use JmsXA and JTA transactions to more clearly demonstrate what doesn't seem to be working. I'm attaching the code. I deploy this MBean and invoke the following operations:
connect() startTransaction() sendMessageToQueue() *** At this point, a "Message received" line appears in the server log. endTransaction()
The message shouldn't be received until AFTER endTransaction() has been called, right? Is this a bug, or am I doing something wrong? -
2. Re: JMS XA - Am I doing something wrong?
javajedi Aug 21, 2005 8:53 PM (in response to javajedi)package org.jboss.mq.il.ha.examples; import java.util.Properties; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.transaction.*; import org.jboss.logging.Logger; import org.jboss.system.ServiceMBeanSupport; /** * * Helps to manually test the HAIL * * @author Ivelin Ivanov <ivelin@apache.org> * */ public class HAJMSClient2 extends ServiceMBeanSupport implements MessageListener, ExceptionListener, HAJMSClient2MBean { private Transaction _tx; /** * Acknowledges connenction exception. * Should be invoked every time the HAIL singleton moves. */ public void onException(JMSException connEx) { log.info("Notification received by ExceptionListener. Singleton Probably Moved."); try { disconnect(); } catch (Exception e) { e.printStackTrace(); } finally { connectionException_ = connEx; } } public void startTransaction() throws Exception { TransactionManager tm = getMgr(); tm.begin(); _tx = tm.getTransaction(); tm.suspend(); } public void endTransaction() throws Exception { TransactionManager tm = getMgr(); tm.resume(_tx); tm.commit(); _tx = null; } private TransactionManager getMgr() throws Exception { return (TransactionManager) new InitialContext().lookup("java:/TransactionManager"); } public void connect() throws NamingException, JMSException { log.info("Connecting"); Properties p = new Properties(); p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); p.put(Context.URL_PKG_PREFIXES, "jboss.naming:org.jnp.interfaces"); p.put(Context.PROVIDER_URL, "localhost:1100"); // HA-JNDI port. InitialContext iniCtx = new InitialContext(); Object tmpConnFactory = iniCtx.lookup("java:/JmsXA"); TopicConnectionFactory tcf = (TopicConnectionFactory)tmpConnFactory; topicConn_ = tcf.createTopicConnection(); topic_ = (Topic)iniCtx.lookup("topic/testTopic"); topicSession_ = topicConn_.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); // The commented out code can be used to test non-durable subscriptions //topicSub_ = topicSession_.createSubscriber(topic_); //topicSub_.setMessageListener( this ); topicPub_ = topicSession_.createPublisher(topic_); topicConn_.start(); QueueConnectionFactory qcf = (QueueConnectionFactory)tmpConnFactory; qConn_ = qcf.createQueueConnection(JMS_USERNAME, JMS_PASSWORD); q_ = (Queue)iniCtx.lookup("queue/testQueue"); qSession_ = qConn_.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); qRecv_ = qSession_.createReceiver(q_); qRecv_.setMessageListener( this ); qSend_ = qSession_.createSender(q_); qConn_.start(); log.info("Connected"); } public void disconnect() throws JMSException { if (topicConn_ == null) return; log.info("Disconnecting"); connectionException_ = null; try { //topicSub_.close(); topicPub_.close(); topicConn_.stop(); topicSession_.close(); if (durTopicConn_ != null) { durTopicConn_.stop(); durTopicSession_.close(); } qRecv_.close(); qSend_.close(); qConn_.stop(); qSession_.close(); } finally { try { topicConn_.close(); if (durTopicConn_ != null) durTopicConn_.close(); } finally { topicConn_ = null; durTopicConn_ = null; try { qConn_.close(); } finally { qConn_ = null; } } } log.info("Disconnected"); } public void registerDurableSubscriberAndDisconnect() throws JMSException, NamingException { log.info("Registering durable subscriber"); InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("XAConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory)tmp; TopicConnection topicConn = tcf.createTopicConnection(JMS_USERNAME, JMS_PASSWORD); topicConn.setClientID("durSubTestClient"); Topic topic = (Topic)iniCtx.lookup("topic/testTopic"); TopicSession topicSession = topicConn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); TopicSubscriber topicSub = topicSession.createDurableSubscriber(topic, "durableSub"); topicConn.start(); topicSub.close(); topicSession.close(); topicConn.close(); log.info("Registered durable subscriber 'durableSub' and disconnected. Messages for this subscriber will be stored by the JMS server until it becomes available."); } public void registerDurableSubscriberAndReceiveMessages() throws JMSException, NamingException { log.info("Registering durable subscriber"); InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("XAConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory)tmp; durTopicConn_= tcf.createTopicConnection(JMS_USERNAME, JMS_PASSWORD); durTopicConn_.setClientID("durSubTestClient"); Topic topic = (Topic)iniCtx.lookup("topic/testTopic"); durTopicSession_ = durTopicConn_.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); TopicSubscriber topicSub = durTopicSession_.createDurableSubscriber(topic, "durableSub"); topicSub.setMessageListener( this ); durTopicConn_.start(); log.info("Registered durable subscriber 'durableSub' and waiting to receive messages."); } /** * Handle JMS message * * @see javax.jms.MessageListener#onMessage(javax.jms.Message) */ public void onMessage(Message msg) { lastMessage_ = (TextMessage)msg; log.info("Message received: " + msg); } public String getLastMessage() throws JMSException { if (lastMessage_ == null) return null; return lastMessage_.getText(); } public String getConnectionException() { if (connectionException_ == null) return null; return connectionException_.toString(); } public void publishMessageToTopic(String text) throws Exception { TransactionManager tm = getMgr(); tm.resume(_tx); TextMessage msg = topicSession_.createTextMessage(text); topicPub_.publish(msg); log.info("HA JMS message published to topic: " + text); tm.suspend(); } public void sendMessageToQueue(String text) throws Exception { TransactionManager tm = getMgr(); tm.resume(_tx); TextMessage msg = qSession_.createTextMessage(text); qSend_.send(msg); log.info("HA JMS message sent to queue: " + text); tm.suspend(); } private Topic topic_; private TopicConnection topicConn_; private TopicSession topicSession_; private JMSException connectionException_; private TopicConnection durTopicConn_; private TopicSession durTopicSession_; //private TopicSubscriber topicSub_; private TopicPublisher topicPub_; private TextMessage lastMessage_; private Queue q_; private QueueConnection qConn_; private QueueSession qSession_; private QueueReceiver qRecv_; private QueueSender qSend_; private static String JMS_USERNAME = "dynsub"; private static String JMS_PASSWORD = "dynsub"; private static Logger log = Logger.getLogger(HAJMSClient2.class); }