1 Reply Latest reply on Sep 9, 2009 12:00 PM by davestanley

    How to send one message to a topic in MDB? Click to flag this post

    wangdongfox

      Environment: JBoss 5.1GA, FUSE MQ 5.3.0.4 embeded broker configuration

       

      I want to send a message to a topic when receiving one message in a MDB, but it can't be successful all the time, any one can help me? I have struggled in it about 2 weeks.

       

      It is my sample code:

       

      package com.trading.platform.ejb;

       

      import javax.ejb.MessageDrivenBean;

      import javax.ejb.MessageDrivenContext;

      import javax.jms.Message;

      import javax.jms.MessageListener;

      import javax.jms.Queue;

      import javax.jms.QueueConnectionFactory;

      import javax.jms.TextMessage;

      import javax.jms.Topic;

      import javax.jms.TopicConnection;

      import javax.jms.TopicConnectionFactory;

      import javax.jms.TopicPublisher;

      import javax.jms.TopicSession;

      import javax.naming.Context;

      import javax.naming.InitialContext;

      import javax.naming.NamingException;

       

      import com.trading.platform.context.ServerContext;

      import com.trading.platform.exception.InvokeException;

      import com.trading.platform.exception.SystemException;

      import com.trading.platform.handler.MessageHandler;

      import com.trading.platform.log.ILogger;

      import com.trading.platform.log.Logger;

       

      public class OrderResponseMDB implements MessageDrivenBean, MessageListener{

              private ILogger logger = Logger.getLogger(this.getClass());

              private MessageDrivenContext messageDrivenContext;

              

              private volatile TopicConnectionFactory topicConnnectionFactory;

              private volatile Topic topic;

              

              private void getInitialContext(){

                      try {

                              Context context = new InitialContext();

                              topicConnnectionFactory = (TopicConnectionFactory)context.lookup("java:comp/env/jms/TopicConnectionFactory");

                              //topic = (Topic)context.lookup("java:comp/env/jms/OrderBroadcastTopic");

                              if(logger.isDebugEnabled()) logger.debug("Platform:Publisher Topic Session connnection is created..." );

                      } catch (NamingException e) {

                              if(logger.isErrorEnabled()) logger.error("NamingExcetion:" + e);

                              throw new SystemException("System Error", e);

                      }

              }

       

               public void onMessage(Message inMessage) {

       

                       TopicConnection topicConn = null;

                      try {

                          if (inMessage instanceof TextMessage) {

                              TextMessage txtmsg = (TextMessage)inMessage;     

                              

                              System.out.println("MESSAGE BEAN3: Message received: index="+txtmsg.getText());

                              topicConn = topicConnnectionFactory.createTopicConnection();

                  topicConn.start();

                  TopicSession topicSession = topicConn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

                  topic = topicSession.createTopic("topic.orderbroadcast");

                  

                  TopicPublisher publisher = topicSession.createPublisher(topic);

                  TextMessage newMsg = topicSession.createTextMessage("11111");

                  publisher.send(newMsg);

                  

                  //publisher.close();

                  //topicSession.close();

                  topicConn.close();

                          } else {

                              System.out.println("WRONG MESSAGE TYPE received: " + inMessage.getClass().getName());

                          }

                      } catch (Throwable t) {

                      System.out.println("error...");

                          t.printStackTrace();

                          messageDrivenContext.setRollbackOnly();

                          

                          try {

                              if (topicConn != null)  topicConn.close();

                              System.out.println("MESSAGE BEAN: Rollback: index="+((TextMessage)inMessage).getText());

                          } catch (Exception e) {

                              System.out.println("MESSAGE BEAN: Unable to get index property because:"+e.getMessage());

                              e.printStackTrace();

                              

                          }

                          throw new RuntimeException(t.getMessage());

                      }

               }

       

              

              public void ejbCreate(){

                      System.out.println("ConnectionFactory created...");

                      getInitialContext();

              }

              

              public void ejbRemove(){

                      

              }

              

              public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext){

                      this.messageDrivenContext = messageDrivenContext;

              }

      }

       

      The following is the exception information:

       

      14:59:09,601 WARN  Async error occurred: javax.transaction.xa.XAException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.

      javax.transaction.xa.XAException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.

              at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)

              at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)

              at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)

              at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)

              at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)

              at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)

              at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)

              at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)

              at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)

              at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)

              at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)

              at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)

              at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)

              at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)

              at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)

              at java.lang.Thread.run(Thread.java:595)

      14:59:09,614 WARN  Connection failed: javax.jms.JMSException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.

      14:59:09,615 WARN  Connection error occured: org.jboss.resource.connectionmanager.TxConnectionManager$TxConnectionEventListener@18936ae[state=NORMAL mc=org.apache.activemq.ra.ActiveMQManagedConnection@1365339 handles=0 lastUse=1252306749595 permit=false trackByTx=false mcp=org.jboss.resource.connectionmanager.JBossManagedConnectionPool$OnePool@191c3ce context=org.jboss.resource.connectionmanager.InternalManagedConnectionPool@ce1472 xaResource=org.apache.activemq.ra.ActiveMQManagedConnection$1@164e48d txSync=null]

      javax.jms.JMSException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.

              at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)

              at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1784)

              at org.apache.activemq.ActiveMQConnection$2$1.run(ActiveMQConnection.java:1705)

              at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)

              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)

              at java.lang.Thread.run(Thread.java:595)

      Caused by: javax.transaction.xa.XAException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.

              at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)

              at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)

              at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)

              at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)

              at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)

              at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)

              at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)

              at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)

              at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)

              at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)

              at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)

              at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)

              at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)

              at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)

              at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)

              ... 1 more