4 Replies Latest reply on Jan 23, 2008 3:18 PM by Pavel Kadlec

    Two MD beans in one distributed transaction

    Pavel Kadlec Novice

      Hello,

      I am trying two days to have two message driven beans in one global distributed transaction. I have working code, but sometimes it throws strange exception....

      The message driven beans code in my message driven beans is

      package trail.mdb;
      
      import javax.annotation.Resource;
      import javax.ejb.ActivationConfigProperty;
      import javax.ejb.MessageDriven;
      import javax.ejb.TransactionAttribute;
      import javax.ejb.TransactionAttributeType;
      import javax.jms.Message;
      import javax.jms.MessageListener;
      import javax.jms.ObjectMessage;
      import javax.persistence.EntityManager;
      import javax.persistence.PersistenceContext;
      import javax.transaction.SystemException;
      import javax.transaction.Transaction;
      import javax.transaction.TransactionManager;
      
      import org.jboss.tm.TransactionPropagationContextImporter;
      
      import trail.entity.MyEntity;
      
      @MessageDriven(activationConfig =
      {
       @ActivationConfigProperty(propertyName="destinationType",
       propertyValue="javax.jms.Queue"),
       @ActivationConfigProperty(propertyName="destination",
       propertyValue="queue/mdb"),
      })
      public class BooMDB implements MessageListener {
      
       @PersistenceContext()
       protected EntityManager em;
      
       @Resource(mappedName="java:/TransactionPropagationContextImporter")
       TransactionPropagationContextImporter importer;
      
       @Resource(mappedName="java:/TransactionManager")
       private TransactionManager tm;
      
       @TransactionAttribute(value=TransactionAttributeType.NEVER)
       public void onMessage (Message msg) {
      
       try {
       int id = msg.getIntProperty("id");
       System.out.println("onMessage called with id: " + id);
      
       ObjectMessage om = (ObjectMessage)msg;
       // get transaction propagation context
       Object cx = om.getObject();
       Transaction tx = importer.importTransactionPropagationContext(cx);
      
       /*
       * I am trying to associate current thread with the global
       * transaction. Is this correct way? How shall I do it?
       */
       tm.resume(tx);
       em.persist(new MyEntity(id));
      
       } catch (Exception e) {
       e.printStackTrace();
       } finally {
      
       try {
       /*
       Diassociate current thread from the global transaction.
       Is that OK? Do I have to do it?
       */
       tm.suspend();
       } catch (SystemException e) {
       e.printStackTrace();
       }
       }
       }
      }
      


      package trail.slsb;
      import java.io.Serializable;
      
      import javax.annotation.Resource;
      import javax.ejb.Remote;
      import javax.ejb.Stateless;
      import javax.ejb.TransactionAttribute;
      import javax.ejb.TransactionAttributeType;
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.JMSException;
      import javax.jms.MessageProducer;
      import javax.jms.ObjectMessage;
      import javax.jms.Queue;
      import javax.jms.Session;
      import javax.naming.InitialContext;
      import javax.naming.NamingException;
      import javax.transaction.TransactionManager;
      
      import org.apache.log4j.Logger;
      import org.jboss.tm.TransactionPropagationContextFactory;
      
      @Stateless
      @Remote(value=Foo.class)
      public class FooBean implements Foo {
      
       private static Logger logger = Logger.getLogger(FooBean.class);
      
       @Resource(mappedName="java:/TransactionManager")
       private TransactionManager tm;
      
       @Resource(mappedName="java:/TransactionPropagationContextExporter")
       TransactionPropagationContextFactory exporter;
      
       private Connection conn;
       private Queue mdbQueue;
       private Session session;
      
      
       @TransactionAttribute(value=TransactionAttributeType.REQUIRES_NEW)
       public void foo(int id1, int id2) throws Exception {
      
       try {
       setupConnection();
      
      
       Object cx = exporter.getTransactionPropagationContext(tm.getTransaction());
      
       sendAMessage(id1, cx);
       sendAMessage(id2, cx);
      
       stop();
      
       Thread.sleep(6000);
      
       } catch (Exception e) {
       e.printStackTrace();
       }
      
       System.out.println("about to commit...");
       }
      
       private void setupConnection() throws JMSException, NamingException {
      
       InitialContext iniCtx = new InitialContext();
      
       Object tmp = iniCtx.lookup("/ConnectionFactory");
       ConnectionFactory qcf = (ConnectionFactory) tmp;
       conn = qcf.createConnection();
       session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
       conn.start();
      
       mdbQueue = (Queue) iniCtx.lookup("queue/mdb");
      
       }
      
      
       private void sendAMessage(int saving, Object cx) throws JMSException {
      
       MessageProducer send = session.createProducer(mdbQueue);
      
       ObjectMessage om = session.createObjectMessage();
       om.setObject((Serializable) cx);
       om.setIntProperty("id", saving);
      
      
      
       send.send(om);
       send.close();
       }
      
       private void stop() throws JMSException {
      
       conn.stop();
       session.close();
       conn.close();
       }
      
      }
      


      This is code of my test main class
      
      package trail.test;
      
      import javax.naming.InitialContext;
      
      import trail.slsb.Foo;
      
      public class TestFooBean {
      
       /**
       * @param args
       */
       public static void main(String[] args) throws Exception {
      
       InitialContext ic = new InitialContext();
       Foo bean = (Foo) ic.lookup("FooBean/remote");
       bean.foo(51, 42);
      
       }
      
      }
      package trail.test;
      
      import javax.naming.InitialContext;
      
      import trail.slsb.Foo;
      
      public class TestFooBean {
      
       /**
       * @param args
       */
       public static void main(String[] args) throws Exception {
      
       InitialContext ic = new InitialContext();
       Foo bean = (Foo) ic.lookup("FooBean/remote");
       bean.foo(51, 42);
      
       }
      
      }
      


      This is the exception I sometimes get from my test client
      Exception in thread "main" java.util.NoSuchElementException
       at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1029)
       at java.util.TreeMap$KeyIterator.next(TreeMap.java:1058)
       at java.util.AbstractCollection.toArray(AbstractCollection.java:176)
       at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.beforeCompletion(TwoPhaseCoordinator.java:233)
       at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:86)
       at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:177)
       at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1382)
       at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:135)
       at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:87)
       at org.jboss.aspects.tx.TxPolicy.endTransaction(TxPolicy.java:175)
       at org.jboss.aspects.tx.TxPolicy.invokeInOurTx(TxPolicy.java:87)
       at org.jboss.aspects.tx.TxInterceptor$RequiresNew.invoke(TxInterceptor.java:262)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.aspects.tx.TxPropagationInterceptor.invoke(TxPropagationInterceptor.java:76)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.ejb3.stateful.StatefulInstanceInterceptor.invoke(StatefulInstanceInterceptor.java:83)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.aspects.security.AuthenticationInterceptor.invoke(AuthenticationInterceptor.java:77)
       at org.jboss.ejb3.security.Ejb3AuthenticationInterceptor.invoke(Ejb3AuthenticationInterceptor.java:106)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.ejb3.ENCPropagationInterceptor.invoke(ENCPropagationInterceptor.java:46)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.ejb3.asynchronous.AsynchronousInterceptor.invoke(AsynchronousInterceptor.java:106)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.ejb3.stateful.StatefulContainer.dynamicInvoke(StatefulContainer.java:333)
       at org.jboss.aop.Dispatcher.invoke(Dispatcher.java:106)
       at org.jboss.aspects.remoting.AOPRemotingInvocationHandler.invoke(AOPRemotingInvocationHandler.java:82)
       at org.jboss.remoting.ServerInvoker.invoke(ServerInvoker.java:795)
       at org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:573)
       at org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:373)
       at org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:166)
       at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:163)
       at org.jboss.remoting.Client.invoke(Client.java:1634)
       at org.jboss.remoting.Client.invoke(Client.java:548)
       at org.jboss.aspects.remoting.InvokeRemoteInterceptor.invoke(InvokeRemoteInterceptor.java:62)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.aspects.tx.ClientTxPropagationInterceptor.invoke(ClientTxPropagationInterceptor.java:61)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.aspects.security.SecurityClientInterceptor.invoke(SecurityClientInterceptor.java:53)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.ejb3.remoting.IsLocalInterceptor.invoke(IsLocalInterceptor.java:72)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.ejb3.stateful.StatefulRemoteProxy.invoke(StatefulRemoteProxy.java:135)
       at $Proxy1.foo(Unknown Source)
       at trail.test.TestFooBean.main(TestFooBean.java:16)
       at org.jboss.aspects.remoting.InvokeRemoteInterceptor.invoke(InvokeRemoteInterceptor.java:74)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.aspects.tx.ClientTxPropagationInterceptor.invoke(ClientTxPropagationInterceptor.java:61)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.aspects.security.SecurityClientInterceptor.invoke(SecurityClientInterceptor.java:53)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.ejb3.remoting.IsLocalInterceptor.invoke(IsLocalInterceptor.java:72)
       at org.jboss.aop.joinpoint.MethodInvocation.invokeNext(MethodInvocation.java:101)
       at org.jboss.ejb3.stateful.StatefulRemoteProxy.invoke(StatefulRemoteProxy.java:135)
       at $Proxy1.foo(Unknown Source)
       at trail.test.TestFooBean.main(TestFooBean.java:16)
      


      I would be grateful if you could help me.
      Thank you.
      Pavel


        • 1. Re: Two MD beans in one distributed transaction
          Pavel Kadlec Novice

          Hello,

          I was trying to solve the problem with the exception. I had to make synchonization changes in class com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator. Look for MY_CHANGE in the code bellow. Now it is working without the exception.

          I have synchronized an access to _synchs variable. I have JBoss-4.2.1.GA with default JBossTS (JBossTS-4.2.3.SP5)

          The changed code is

          /*
           * JBoss, Home of Professional Open Source
           * Copyright 2006, Red Hat Middleware LLC, and individual contributors
           * as indicated by the @author tags.
           * See the copyright.txt in the distribution for a
           * full listing of individual contributors.
           * This copyrighted material is made available to anyone wishing to use,
           * modify, copy, or redistribute it subject to the terms and conditions
           * of the GNU Lesser General Public License, v. 2.1.
           * This program is distributed in the hope that it will be useful, but WITHOUT A
           * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
           * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
           * You should have received a copy of the GNU Lesser General Public License,
           * v.2.1 along with this distribution; if not, write to the Free Software
           * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
           * MA 02110-1301, USA.
           *
           * (C) 2005-2006,
           * @author JBoss Inc.
           */
          /*
           * Copyright (C) 2001,
           *
           * Hewlett-Packard Arjuna Labs,
           * Newcastle upon Tyne,
           * Tyne and Wear,
           * UK.
           *
           * $Id: TwoPhaseCoordinator.java 2342 2006-03-30 13:06:17Z $
           */
          
          package com.arjuna.ats.arjuna.coordinator;
          
          import com.arjuna.ats.arjuna.common.Uid;
          
          import com.arjuna.ats.arjuna.logging.tsLogger;
          
          import java.util.SortedSet;
          import java.util.TreeSet;
          import java.util.Iterator;
          import java.util.Stack;
          
          /**
           * Adds support for synchronizations to BasicAction. It does not change thread
           * associations either. It also allows any thread to terminate a transaction,
           * even if it is not the transaction that is marked as current for the thread
           * (unlike the BasicAction default).
           *
           * @author Mark Little (mark@arjuna.com)
           * @version $Id: TwoPhaseCoordinator.java 2342 2006-03-30 13:06:17Z $
           * @since JTS 3.0.
           */
          
          public class TwoPhaseCoordinator extends BasicAction implements Reapable
          {
          
           public TwoPhaseCoordinator ()
           {
           }
          
           public TwoPhaseCoordinator (Uid id)
           {
           super(id);
           }
          
           public int start ()
           {
           return start(BasicAction.Current());
           }
          
           public int start (BasicAction parentAction)
           {
           if (parentAction != null)
           parentAction.addChildAction(this);
          
           return super.Begin(parentAction);
           }
          
           public int end (boolean report_heuristics)
           {
           int outcome;
          
           if (parent() != null)
           parent().removeChildAction(this);
          
           if (beforeCompletion())
           {
           outcome = super.End(report_heuristics);
           }
           else
           outcome = super.Abort();
          
           afterCompletion(outcome);
          
           return outcome;
           }
          
           public int cancel ()
           {
           if (parent() != null)
           parent().removeChildAction(this);
          
           // beforeCompletion();
          
           int outcome = super.Abort();
          
           afterCompletion(outcome);
          
           return outcome;
           }
          
           public int addSynchronization (SynchronizationRecord sr)
           {
           if (sr == null)
           return AddOutcome.AR_REJECTED;
          
           int result = AddOutcome.AR_REJECTED;
          
           if (parent() != null)
           return AddOutcome.AR_REJECTED;
          
           switch (status())
           {
           case ActionStatus.RUNNING:
           {
           synchronized (this)
           {
           if (_synchs == null)
           {
           // Synchronizations should be stored (or at least iterated) in their natural order
           _synchs = new TreeSet();
           }
           }
          
           // disallow addition of Synchronizations that would appear
           // earlier in sequence than any that has already been called
           // during the pre-commmit phase. This generic support is required for
           // JTA Synchronization ordering behaviour
           if(sr instanceof Comparable && _currentRecord != null) {
           Comparable c = (Comparable)sr;
           if(c.compareTo(_currentRecord) != 1) {
           return AddOutcome.AR_REJECTED;
           }
           }
           // MY_CHANGE
           synchronized (this)
           {
           if (_synchs.add(sr))
           {
           result = AddOutcome.AR_ADDED;
           }
           }
           }
           break;
           default:
           break;
           }
          
           return result;
           }
          
           /**
           * @return <code>true</code> if the transaction is running,
           * <code>false</code> otherwise.
           */
          
           public boolean running ()
           {
           return (boolean) (status() == ActionStatus.RUNNING);
           }
          
           /**
           * Overloads BasicAction.type()
           */
          
           public String type ()
           {
           return "/StateManager/BasicAction/AtomicAction/TwoPhaseCoordinator";
           }
          
           /**
           * Get any Throwable that was caught during commit processing but not directly rethrown.
           * @return
           */
           public Throwable getDeferredThrowable() {
           return _deferredThrowable;
           }
          
           protected TwoPhaseCoordinator (int at)
           {
           super(at);
           }
          
           protected TwoPhaseCoordinator (Uid u, int at)
           {
           super(u, at);
           }
          
           /**
           * @message com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_1
           * [com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_1]
           * TwoPhaseCoordinator.beforeCompletion - attempted rollback_only
           * failed!
           * @message com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_2
           * [com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_2]
           * TwoPhaseCoordinator.beforeCompletion - failed for {0}
           */
          
           protected boolean beforeCompletion ()
           {
           boolean problem = false;
          
          
          
          
           // MY_CHANGE
           synchronized (this)
           {
           /*
           * If we have a synchronization list then we must be top-level.
           */
           if (_synchs != null)
           {
           /*
           * We must always call afterCompletion() methods, so just catch (and
           * log) any exceptions/errors from beforeCompletion() methods.
           *
           * If one of the Syncs throws an error the Record wrapper returns false
           * and we will rollback. Hence we don't then bother to call beforeCompletion
           * on the remaining records (it's not done for rollabcks anyhow).
           *
           * Since Synchronizations may add register other Synchronizations, we can't simply
           * iterate the collection. Instead we work from an ordered copy, which we periodically
           * check for freshness. The addSynchronization method uses _currentRecord to disallow
           * adding records in the part of the array we have already traversed, thus all
           * Synchronization will be called and the (jta only) rules on ordering of interposed
           * Synchronization will be respected.
           */
          
           int lastIndexProcessed = -1;
           SynchronizationRecord[] copiedSynchs = (SynchronizationRecord[])_synchs.toArray(new SynchronizationRecord[] {});
          
           while( (lastIndexProcessed < _synchs.size()-1) && !problem) {
          
           // if new Synchronization have been registered, refresh our copy of the collection:
           if(copiedSynchs.length != _synchs.size()) {
           copiedSynchs = (SynchronizationRecord[])_synchs.toArray(new SynchronizationRecord[] {});
           }
          
           lastIndexProcessed = lastIndexProcessed+1;
           _currentRecord = copiedSynchs[lastIndexProcessed];
          
           try
           {
           problem = !_currentRecord.beforeCompletion();
          
           // if something goes wrong, we can't just throw the exception, we need to continue to
           // complete the transaction. However, the exception may have interesting information that
           // we want later, so we keep a reference to it as well as logging it.
          
           }
           catch (Exception ex)
           {
           tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_2", new Object[]
           { _currentRecord }, ex);
           if(_deferredThrowable == null) {
           _deferredThrowable = ex;
           }
           problem = true;
           }
           catch (Error er)
           {
           tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_2", new Object[]
           { _currentRecord }, er);
           if(_deferredThrowable == null) {
           _deferredThrowable = er;
           }
           problem = true;
           }
           }
          
           if (problem)
           {
           if (!preventCommit())
           {
           /*
           * This should not happen. If it does, continue with commit
           * to tidy-up.
           */
          
           tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_1");
           }
           }
           }
           }
          
           return !problem;
           }
          
           /**
           * @message com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_3
           * [com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_3]
           * TwoPhaseCoordinator.beforeCompletion
           * TwoPhaseCoordinator.afterCompletion called on still running
           * transaction!
           * @message com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_4
           * [com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_4]
           * TwoPhaseCoordinator.afterCompletion - returned failure for {0}
           * @message com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_4a
           * [com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_4a]
           * TwoPhaseCoordinator.afterCompletion - failed for {0} with exception {1}
           * @message com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_4b
           * [com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_4b]
           * TwoPhaseCoordinator.afterCompletion - failed for {0} with error {1}
           */
          
           protected boolean afterCompletion (int myStatus)
           {
           if (myStatus == ActionStatus.RUNNING)
           {
           tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_3");
          
           return false;
           }
          
           boolean problem = false;
          
           // MY_CHANGE
           synchronized (this)
           {
           if (_synchs != null)
           {
           // afterCompletions should run in reverse order compared to beforeCompletions
           Stack stack = new Stack();
           Iterator iterator = _synchs.iterator();
           while(iterator.hasNext()) {
           stack.push(iterator.next());
           }
          
           iterator = stack.iterator();
          
           /*
           * Regardless of failures, we must tell all synchronizations what
           * happened.
           */
           while(!stack.isEmpty())
           {
           SynchronizationRecord record = (SynchronizationRecord)stack.pop();
          
           try
           {
           if (!record.afterCompletion(myStatus))
           {
           tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_4", new Object[]
           { record });
          
           problem = true;
           }
           }
           catch (Exception ex)
           {
           tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_4a", new Object[]
           { record, ex });
           problem = true;
           }
           catch (Error er)
           {
           tsLogger.arjLoggerI18N.warn("com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_4b", new Object[]
           { record, er });
           problem = true;
           }
           }
          
           _synchs = null;
           _currentRecord = null;
           }
           }
          
           return !problem;
          
           }
          
           //private HashList _synchs;
           private SortedSet _synchs;
           private SynchronizationRecord _currentRecord; // the most recently processed Synchronization.
           private Throwable _deferredThrowable;
          }
          
          


          Have I found a bug? Or do I use distributed transactions incorrectly?

          I have another question. I would like that all messages would be received in one distributed transaction. The producer would know that all messages
          were received or neither of them. I know that it is possible from JMS specification. But how achieve that in ejb environment? Transactions are controlled by the container. Wish I had access to session's XAResource in method onMessage of the message driven bean:-)....
          Thanks for help.
          Pavel

          • 2. Re: Two MD beans in one distributed transaction
            Mark Little Master

            Your issue may have been related to this one.

            • 3. Re: Two MD beans in one distributed transaction
              Pavel Kadlec Novice

              With the changed TwoPhaseCoordinator it never throws that exception, but I have unfortunatelly another exception:-(

              I am JBossESB user, but I will try reformulate the problem using MDB vocabulary.

              I have expanded the example. I have one message driven bean. I send to the mdb approx. 5000 messages. I have limited number of threads that mdb can use to process messages to 200. The message driven bean gets some information from the message and persists new entity using hibernate entity manager.

              1. I begin global JTA transaction.
              2. Then I process all messages.
              3. After all messages are processed, I commit the transaction.

              The code works perfectly, but sometimes (10%) when I commit the global transaction, it throws following exception.

              17:45:42,304 DEBUG [JBossESBPropertyService$JTATransactionStrategy] Committing transaction on current thread
              17:45:42,306 DEBUG [AbstractFlushingEventListener] processing flush-time cascades
              17:45:42,315 DEBUG [AbstractFlushingEventListener] dirty checking collections
              17:45:42,428 DEBUG [AbstractFlushingEventListener] Flushed: 2500 insertions, 0 updates, 0 deletions to 2500 objects
              17:45:42,430 DEBUG [AbstractFlushingEventListener] Flushed: 0 (re)creations, 0 updates, 0 removals to 0 collections
              17:45:42,906 DEBUG [AbstractFlushingEventListener] processing flush-time cascades
              17:45:42,910 DEBUG [AbstractFlushingEventListener] dirty checking collections
              17:45:42,914 DEBUG [AbstractFlushingEventListener] Flushed: 1 insertions, 0 updates, 0 deletions to 1 objects
              17:45:42,916 DEBUG [AbstractFlushingEventListener] Flushed: 0 (re)creations, 0 updates, 0 removals to 0 collections
              17:45:42,939 WARN [arjLoggerI18N] [com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator_2] TwoPhaseCoordinator.beforeCompletion - failed for com.arj
              una.ats.internal.jta.resources.arjunacore.SynchronizationImple@171dce9
              javax.persistence.PersistenceException: org.hibernate.HibernateException: Flush during cascade is dangerous
               at org.hibernate.ejb.AbstractEntityManagerImpl.throwPersistenceException(AbstractEntityManagerImpl.java:629)
               at org.hibernate.ejb.AbstractEntityManagerImpl$1.beforeCompletion(AbstractEntityManagerImpl.java:524)
               at com.arjuna.ats.internal.jta.resources.arjunacore.SynchronizationImple.beforeCompletion(SynchronizationImple.java:114)
               at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.beforeCompletion(TwoPhaseCoordinator.java:251)
               at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:86)
               at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:177)
               at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1382)
               at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:135)
               at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:87)
               at org.jboss.soa.esb.common.JBossESBPropertyService$JTATransactionStrategy.terminate(JBossESBPropertyService.java:173)
               at cz.example.esb.actions.TxAction.terminate(TxAction.java:106)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
               at java.lang.reflect.Method.invoke(Method.java:585)
               at org.jboss.soa.esb.listeners.message.ActionProcessorMethodInfo.processMethods(ActionProcessorMethodInfo.java:102)
               at org.jboss.soa.esb.listeners.message.OverriddenActionLifecycleProcessor.process(OverriddenActionLifecycleProcessor.java:74)
               at org.jboss.soa.esb.listeners.message.ActionProcessingPipeline.process(ActionProcessingPipeline.java:316)
               at org.jboss.soa.esb.listeners.message.MessageAwareListener$TransactionalRunner.run(MessageAwareListener.java:566)
               at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
               at java.lang.Thread.run(Thread.java:595)
              Caused by: org.hibernate.HibernateException: Flush during cascade is dangerous
               at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:996)
               at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:338)
               at org.hibernate.ejb.AbstractEntityManagerImpl$1.beforeCompletion(AbstractEntityManagerImpl.java:515)
               ... 20 more
              17:45:43,104 DEBUG [JBossESBPropertyService$JTATransactionStrategy] Failed to terminate transaction on current thread
              javax.transaction.RollbackException: [com.arjuna.ats.internal.jta.transaction.arjunacore.commitwhenaborted] [com.arjuna.ats.internal.jta.transaction.a
              rjunacore.commitwhenaborted] Can't commit because the transaction is in aborted state
               at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1394)
               at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:135)
               at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:87)
               at org.jboss.soa.esb.common.JBossESBPropertyService$JTATransactionStrategy.terminate(JBossESBPropertyService.java:173)
               at cz.example.esb.actions.TxAction.terminate(TxAction.java:106)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
               at java.lang.reflect.Method.invoke(Method.java:585)
               at org.jboss.soa.esb.listeners.message.ActionProcessorMethodInfo.processMethods(ActionProcessorMethodInfo.java:102)
               at org.jboss.soa.esb.listeners.message.OverriddenActionLifecycleProcessor.process(OverriddenActionLifecycleProcessor.java:74)
               at org.jboss.soa.esb.listeners.message.ActionProcessingPipeline.process(ActionProcessingPipeline.java:316)
               at org.jboss.soa.esb.listeners.message.MessageAwareListener$TransactionalRunner.run(MessageAwareListener.java:566)
               at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
               at java.lang.Thread.run(Thread.java:595)
              Caused by: javax.persistence.PersistenceException: org.hibernate.HibernateException: Flush during cascade is dangerous
               at org.hibernate.ejb.AbstractEntityManagerImpl.throwPersistenceException(AbstractEntityManagerImpl.java:629)
               at org.hibernate.ejb.AbstractEntityManagerImpl$1.beforeCompletion(AbstractEntityManagerImpl.java:524)
               at com.arjuna.ats.internal.jta.resources.arjunacore.SynchronizationImple.beforeCompletion(SynchronizationImple.java:114)
               at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.beforeCompletion(TwoPhaseCoordinator.java:251)
               at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:86)
               at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:177)
               at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1382)
               ... 15 more
              Caused by: org.hibernate.HibernateException: Flush during cascade is dangerous
               at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:996)
               at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:338)
               at org.hibernate.ejb.AbstractEntityManagerImpl$1.beforeCompletion(AbstractEntityManagerImpl.java:515)
               ... 20 more
              17:45:43,196 WARN [ActionProcessingPipeline] Unexpected exception caught while processing the action pipeline: header: [ To: JMSEpr [ PortReference <
               <wsa:Address jms://localhost:1099/queue/quickstart_Aggregator/>, <wsa:ReferenceProperties jbossesb:java.naming.factory.initial : org.jnp.interfaces.N
              amingContextFactory/>, <wsa:ReferenceProperties jbossesb:java.naming.provider.url : localhost/>, <wsa:ReferenceProperties jbossesb:destination-type :
              queue/>, <wsa:ReferenceProperties jbossesb:specification-version : 1.1/>, <wsa:ReferenceProperties jbossesb:connection-factory : ConnectionFactory/>,
              <wsa:ReferenceProperties jbossesb:persistent : false/>, <wsa:ReferenceProperties jbossesb:acknowledge-mode : AUTO_ACKNOWLEDGE/>, <wsa:ReferencePropert
              ies jbossesb:type : urn:jboss/esb/epr/type/jms/> > ] MessageID: ID:JBM-854833 RelatesTo: jms:correlationID#9742b320-a533-404a-95af-4e0654c76f0a ]
              org.jboss.soa.esb.actions.ActionProcessingException: org.jboss.soa.esb.common.TransactionStrategyException: Failed to terminate transaction on current
               thread
               at cz.example.esb.actions.TxAction.terminate(TxAction.java:109)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
               at java.lang.reflect.Method.invoke(Method.java:585)
               at org.jboss.soa.esb.listeners.message.ActionProcessorMethodInfo.processMethods(ActionProcessorMethodInfo.java:102)
               at org.jboss.soa.esb.listeners.message.OverriddenActionLifecycleProcessor.process(OverriddenActionLifecycleProcessor.java:74)
               at org.jboss.soa.esb.listeners.message.ActionProcessingPipeline.process(ActionProcessingPipeline.java:316)
               at org.jboss.soa.esb.listeners.message.MessageAwareListener$TransactionalRunner.run(MessageAwareListener.java:566)
               at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
               at java.lang.Thread.run(Thread.java:595)
              Caused by: org.jboss.soa.esb.common.TransactionStrategyException: Failed to terminate transaction on current thread
               at org.jboss.soa.esb.common.JBossESBPropertyService$JTATransactionStrategy.terminate(JBossESBPropertyService.java:191)
               at cz.example.esb.actions.TxAction.terminate(TxAction.java:106)
               ... 11 more
              Caused by: javax.transaction.RollbackException: [com.arjuna.ats.internal.jta.transaction.arjunacore.commitwhenaborted] [com.arjuna.ats.internal.jta.tr
              ansaction.arjunacore.commitwhenaborted] Can't commit because the transaction is in aborted state
               at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1394)
               at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:135)
               at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:87)
               at org.jboss.soa.esb.common.JBossESBPropertyService$JTATransactionStrategy.terminate(JBossESBPropertyService.java:173)
               ... 12 more
              Caused by: javax.persistence.PersistenceException: org.hibernate.HibernateException: Flush during cascade is dangerous
               at org.hibernate.ejb.AbstractEntityManagerImpl.throwPersistenceException(AbstractEntityManagerImpl.java:629)
               at org.hibernate.ejb.AbstractEntityManagerImpl$1.beforeCompletion(AbstractEntityManagerImpl.java:524)
               at com.arjuna.ats.internal.jta.resources.arjunacore.SynchronizationImple.beforeCompletion(SynchronizationImple.java:114)
               at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.beforeCompletion(TwoPhaseCoordinator.java:251)
               at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:86)
               at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:177)
               at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1382)
               ... 15 more
              Caused by: org.hibernate.HibernateException: Flush during cascade is dangerous
               at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:996)
               at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:338)
               at org.hibernate.ejb.AbstractEntityManagerImpl$1.beforeCompletion(AbstractEntityManagerImpl.java:515)
               ... 20 more
              
              


              I don't know if it is hibernate bug, or JBossTransation bug, or my fault, or what.....

              Best regards
              Pavel Kadlec