Two MD beans in one distributed transaction
kadlecp Jan 4, 2008 10:41 AMHello,
I am trying two days to have two message driven beans in one global distributed transaction. I have working code, but sometimes it throws strange exception....
The message driven beans code in my message driven beans is
package trail.mdb; import javax.annotation.Resource; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import org.jboss.tm.TransactionPropagationContextImporter; import trail.entity.MyEntity; @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"), @ActivationConfigProperty(propertyName="destination", propertyValue="queue/mdb"), }) public class BooMDB implements MessageListener { @PersistenceContext() protected EntityManager em; @Resource(mappedName="java:/TransactionPropagationContextImporter") TransactionPropagationContextImporter importer; @Resource(mappedName="java:/TransactionManager") private TransactionManager tm; @TransactionAttribute(value=TransactionAttributeType.NEVER) public void onMessage (Message msg) { try { int id = msg.getIntProperty("id"); System.out.println("onMessage called with id: " + id); ObjectMessage om = (ObjectMessage)msg; // get transaction propagation context Object cx = om.getObject(); Transaction tx = importer.importTransactionPropagationContext(cx); /* * I am trying to associate current thread with the global * transaction. Is this correct way? How shall I do it? */ tm.resume(tx); em.persist(new MyEntity(id)); } catch (Exception e) { e.printStackTrace(); } finally { try { /* Diassociate current thread from the global transaction. Is that OK? Do I have to do it? */ tm.suspend(); } catch (SystemException e) { e.printStackTrace(); } } } }
package trail.slsb; import java.io.Serializable; import javax.annotation.Resource; import javax.ejb.Remote; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.transaction.TransactionManager; import org.apache.log4j.Logger; import org.jboss.tm.TransactionPropagationContextFactory; @Stateless @Remote(value=Foo.class) public class FooBean implements Foo { private static Logger logger = Logger.getLogger(FooBean.class); @Resource(mappedName="java:/TransactionManager") private TransactionManager tm; @Resource(mappedName="java:/TransactionPropagationContextExporter") TransactionPropagationContextFactory exporter; private Connection conn; private Queue mdbQueue; private Session session; @TransactionAttribute(value=TransactionAttributeType.REQUIRES_NEW) public void foo(int id1, int id2) throws Exception { try { setupConnection(); Object cx = exporter.getTransactionPropagationContext(tm.getTransaction()); sendAMessage(id1, cx); sendAMessage(id2, cx); stop(); Thread.sleep(6000); } catch (Exception e) { e.printStackTrace(); } System.out.println("about to commit..."); } private void setupConnection() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("/ConnectionFactory"); ConnectionFactory qcf = (ConnectionFactory) tmp; conn = qcf.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); mdbQueue = (Queue) iniCtx.lookup("queue/mdb"); } private void sendAMessage(int saving, Object cx) throws JMSException { MessageProducer send = session.createProducer(mdbQueue); ObjectMessage om = session.createObjectMessage(); om.setObject((Serializable) cx); om.setIntProperty("id", saving); send.send(om); send.close(); } private void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } }
This is code of my test main class
package trail.test; import javax.naming.InitialContext; import trail.slsb.Foo; public class TestFooBean { /** * @param args */ public static void main(String[] args) throws Exception { InitialContext ic = new InitialContext(); Foo bean = (Foo) ic.lookup("FooBean/remote"); bean.foo(51, 42); } } package trail.test; import javax.naming.InitialContext; import trail.slsb.Foo; public class TestFooBean { /** * @param args */ public static void main(String[] args) throws Exception { InitialContext ic = new InitialContext(); Foo bean = (Foo) ic.lookup("FooBean/remote"); bean.foo(51, 42); } }
This is the exception I sometimes get from my test client
Exception in thread "main" java.util.NoSuchElementException at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1029) at java.util.TreeMap$KeyIterator.next(TreeMap.java:1058) at java.util.AbstractCollection.toArray(AbstractCollection.java:176) at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.beforeCompletion(TwoPhaseCoordinator.java:233) at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:86) at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:177) at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1382) at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:135) at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:87) at org.jboss.aspects.tx.TxPolicy.endTransaction(TxPolicy.java:175) at org.jboss.aspects.tx.TxPolicy.invokeInOurTx(TxPolicy.java:87) at org.jboss.aspects.tx.TxInterceptor$RequiresNew.invoke(TxInterceptor.java:262) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.aspects.tx.TxPropagationInterceptor.invoke(TxPropagationInterceptor.java:76) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.ejb3.stateful.StatefulInstanceInterceptor.invoke(StatefulInstanceInterceptor.java:83) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.aspects.security.AuthenticationInterceptor.invoke(AuthenticationInterceptor.java:77) at org.jboss.ejb3.security.Ejb3AuthenticationInterceptor.invoke(Ejb3AuthenticationInterceptor.java:106) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.ejb3.ENCPropagationInterceptor.invoke(ENCPropagationInterceptor.java:46) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.ejb3.asynchronous.AsynchronousInterceptor.invoke(AsynchronousInterceptor.java:106) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.ejb3.stateful.StatefulContainer.dynamicInvoke(StatefulContainer.java:333) at org.jboss.aop.Dispatcher.invoke(Dispatcher.java:106) at org.jboss.aspects.remoting.AOPRemotingInvocationHandler.invoke(AOPRemotingInvocationHandler.java:82) at org.jboss.remoting.ServerInvoker.invoke(ServerInvoker.java:795) at org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:573) at org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:373) at org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:166) at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:163) at org.jboss.remoting.Client.invoke(Client.java:1634) at org.jboss.remoting.Client.invoke(Client.java:548) at org.jboss.aspects.remoting.InvokeRemoteInterceptor.invoke(InvokeRemoteInterceptor.java:62) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.aspects.tx.ClientTxPropagationInterceptor.invoke(ClientTxPropagationInterceptor.java:61) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.aspects.security.SecurityClientInterceptor.invoke(SecurityClientInterceptor.java:53) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.ejb3.remoting.IsLocalInterceptor.invoke(IsLocalInterceptor.java:72) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.ejb3.stateful.StatefulRemoteProxy.invoke(StatefulRemoteProxy.java:135) at $Proxy1.foo(Unknown Source) at trail.test.TestFooBean.main(TestFooBean.java:16) at org.jboss.aspects.remoting.InvokeRemoteInterceptor.invoke(InvokeRemoteInterceptor.java:74) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.aspects.tx.ClientTxPropagationInterceptor.invoke(ClientTxPropagationInterceptor.java:61) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.aspects.security.SecurityClientInterceptor.invoke(SecurityClientInterceptor.java:53) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.ejb3.remoting.IsLocalInterceptor.invoke(IsLocalInterceptor.java:72) at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101) at org.jboss.ejb3.stateful.StatefulRemoteProxy.invoke(StatefulRemoteProxy.java:135) at $Proxy1.foo(Unknown Source) at trail.test.TestFooBean.main(TestFooBean.java:16)
I would be grateful if you could help me.
Thank you.
Pavel