How to send one message to a topic in MDB? Click to flag this post
wangdongfox Sep 7, 2009 5:28 AMEnvironment: JBoss 5.1GA, FUSE MQ 5.3.0.4 embeded broker configuration
I want to send a message to a topic when receiving one message in a MDB, but it can't be successful all the time, any one can help me? I have struggled in it about 2 weeks.
It is my sample code:
package com.trading.platform.ejb;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import com.trading.platform.context.ServerContext;
import com.trading.platform.exception.InvokeException;
import com.trading.platform.exception.SystemException;
import com.trading.platform.handler.MessageHandler;
import com.trading.platform.log.ILogger;
import com.trading.platform.log.Logger;
public class OrderResponseMDB implements MessageDrivenBean, MessageListener{
private ILogger logger = Logger.getLogger(this.getClass());
private MessageDrivenContext messageDrivenContext;
private volatile TopicConnectionFactory topicConnnectionFactory;
private volatile Topic topic;
private void getInitialContext(){
try {
Context context = new InitialContext();
topicConnnectionFactory = (TopicConnectionFactory)context.lookup("java:comp/env/jms/TopicConnectionFactory");
//topic = (Topic)context.lookup("java:comp/env/jms/OrderBroadcastTopic");
if(logger.isDebugEnabled()) logger.debug("Platform:Publisher Topic Session connnection is created..." );
} catch (NamingException e) {
if(logger.isErrorEnabled()) logger.error("NamingExcetion:" + e);
throw new SystemException("System Error", e);
}
}
public void onMessage(Message inMessage) {
TopicConnection topicConn = null;
try {
if (inMessage instanceof TextMessage) {
TextMessage txtmsg = (TextMessage)inMessage;
System.out.println("MESSAGE BEAN3: Message received: index="+txtmsg.getText());
topicConn = topicConnnectionFactory.createTopicConnection();
topicConn.start();
TopicSession topicSession = topicConn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
topic = topicSession.createTopic("topic.orderbroadcast");
TopicPublisher publisher = topicSession.createPublisher(topic);
TextMessage newMsg = topicSession.createTextMessage("11111");
publisher.send(newMsg);
//publisher.close();
//topicSession.close();
topicConn.close();
} else {
System.out.println("WRONG MESSAGE TYPE received: " + inMessage.getClass().getName());
}
} catch (Throwable t) {
System.out.println("error...");
t.printStackTrace();
messageDrivenContext.setRollbackOnly();
try {
if (topicConn != null) topicConn.close();
System.out.println("MESSAGE BEAN: Rollback: index="+((TextMessage)inMessage).getText());
} catch (Exception e) {
System.out.println("MESSAGE BEAN: Unable to get index property because:"+e.getMessage());
e.printStackTrace();
}
throw new RuntimeException(t.getMessage());
}
}
public void ejbCreate(){
System.out.println("ConnectionFactory created...");
getInitialContext();
}
public void ejbRemove(){
}
public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext){
this.messageDrivenContext = messageDrivenContext;
}
}
The following is the exception information:
14:59:09,601 WARN Async error occurred: javax.transaction.xa.XAException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.
javax.transaction.xa.XAException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.
at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
at java.lang.Thread.run(Thread.java:595)
14:59:09,614 WARN Connection failed: javax.jms.JMSException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.
14:59:09,615 WARN Connection error occured: org.jboss.resource.connectionmanager.TxConnectionManager$TxConnectionEventListener@18936ae[state=NORMAL mc=org.apache.activemq.ra.ActiveMQManagedConnection@1365339 handles=0 lastUse=1252306749595 permit=false trackByTx=false mcp=org.jboss.resource.connectionmanager.JBossManagedConnectionPool$OnePool@191c3ce context=org.jboss.resource.connectionmanager.InternalManagedConnectionPool@ce1472 xaResource=org.apache.activemq.ra.ActiveMQManagedConnection$1@164e48d txSync=null]
javax.jms.JMSException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1784)
at org.apache.activemq.ActiveMQConnection$2$1.run(ActiveMQConnection.java:1705)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
at java.lang.Thread.run(Thread.java:595)
Caused by: javax.transaction.xa.XAException: Transaction 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' has not been started.
at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
... 1 more