1 2 Previous Next 18 Replies Latest reply on Aug 12, 2009 10:44 AM by kapitanpetko Go to original post
      • 15. Re: New Seam example featuring distributed transactions
        asookazian
        • 16. Re: New Seam example featuring distributed transactions
          asookazian

          Here is an equivalent problem in Spring:


          http://forum.springsource.org/showthread.php?t=15964


          I can easily manually enlist the JMS code with the current JTA transaction in our business method - ie. the transaction created by Spring via the TransactionProxyFactoryBean. This looks something like:
          
          Code:
          
          // Do some Ibatis based Jdbc stuff
          
                      XATopicConnection connection = topicConnectionFactory.createXATopicConnection();
                      XATopicSession session = connection.createXATopicSession();
                      XAResource jmsXA = session.getXAResource();
                      transactionManager.getTransactionManager().getTransaction().enlistResource(jmsXA);
          
                      Message message = session.createTextMessage("Foo");
                      TopicPublisher publisher = ((TopicSession) session).createPublisher(ticketUpdateTopic);
                      publisher.publish(message);
          
          // Do some more stuff
          
          This all works fine - the JMS joins in with the transaction and gets rolled back when exceptions occur elsewhere.



          I'm not sure exactly what API that is (most likely an ActiveMQ or Spring API b/c I don't see createXATopicConnection() in javax.jms.TopicConnectionFactory API), but this is what I'm basically talking about.  In this example, he's programmatically enlisting the JMS resource in the distributed tx.


          The other thing I noticed is that in the org.jboss.seam.transaction.UserTransaction or org.jboss.seam.transaction.CMTTransaction API, there is no way to get a List of the currently enlisted resources in the distributed tx which is likely a limitation of the javax.transaction.UserTransaction interface. 

          • 17. Re: New Seam example featuring distributed transactions
            asookazian

            So here's the latest refactoring:


            private transient TopicPublisher topicPublisher;   
            private transient XATopicSession topicSession;
            private transient XATopicConnection conn;
            private transient Topic topic;
            
            public void startup(){
                      log.info("in startup()");
                      List<Hotel> list = em.createQuery("from Hotel").getResultList();
                      log.info("list.size() = "+list.size());
                      
                      Hotel hotel = null;
                      
                      if (list != null && list.size() > 0) {
                           hotel = list.get(0);
                           hotel.setAddress("arbi's house1");
                      }
                      
                      //em.flush();
                      
                      try{
                           InitialContext iniCtx = new InitialContext();              
                           XATopicConnectionFactory tcf = (XATopicConnectionFactory) iniCtx.lookup("XAConnectionFactory");
                          conn = tcf.createXATopicConnection();
                          topic = (Topic) iniCtx.lookup("topic/chatroomTopic");
                          topicSession = conn.createXATopicSession();          
                          topicPublisher = topicSession.getTopicSession().createPublisher(topic);
                          topicPublisher.publish( topicSession.createObjectMessage(new ChatroomEvent("connect", "arbime")) );
                      }
                      catch(Exception e){
                           //noop
                      }
                      finally {
                           try {
                                topicSession.close();
                                conn.close();
                           }
                           catch(Exception e) {
                                //noop
                           }
                      }
                 }



            output:


            14:55:29,279 INFO  [TestStartConversation] in startup()
            14:55:29,279 INFO  [STDOUT] Hibernate: 
                select
                    hotel0_.id as id145_,
                    hotel0_.address as address145_,
                    hotel0_.name as name145_,
                    hotel0_.state as state145_,
                    hotel0_.country as country145_,
                    hotel0_.price as price145_,
                    hotel0_.city as city145_,
                    hotel0_.zip as zip145_ 
                from
                    Hotel hotel0_
            14:55:29,295 INFO  [TestStartConversation] list.size() = 20
            14:55:29,342 INFO  [TestStartConversation] in destroy()
            14:55:29,357 INFO  [LoggerBean] arbime: connect



            using local-tx-datasource which means that this is not a distributed tx as far as I can tell.


            If the tx mgr is unable to enlist a resource, you see this:


            2009-08-11 13:00:24,133 DEBUG [org.jboss.seam.exception.DebugPageHandler] redirecting to debug page
            javax.transaction.SystemException: java.lang.Throwable: Unabled to enlist resource, see the previous warnings. tx=TransactionImple < ac, BasicAction: a2e0519:1047:4a81a77f:1a4 status: ActionStatus.ABORT_ONLY >
                 at org.jboss.resource.connectionmanager.TxConnectionManager$TxConnectionEventListener$TransactionSynchronization.checkEnlisted(TxConnectionManager.java:759)
            



            and I am not seeing that in this case...

            • 18. Re: New Seam example featuring distributed transactions
              kapitanpetko

              I did a simple test, and a JMS-JPA transaction seems to work as expected: if there is an error while persisting to the database, the
              message gets rolled back too, then re-delivered and eventually send to the DLQ. It does work both with an xa-datasource and a local-tx-datasource. No idea as to why, but JBoss seems to be doing some internal magic here... Anyway, the Seam JMS components are not XA
              aware, so if you want distributed transactions, you have to either use the EJB API's directly or write a
              ManagedXASession
              and a ManagedXAConnection.


              The test:



              1. send a text message to trigger DB update

              2. save a user to the DB, succeeds

              3. send the message again

              4. since the user name is unique, and the user already exists, this time the persist fails.

              5. DB transaction gets rolled back

              6. the message gets rolled back too (and redilivered a few times)



              The code:
              action class:


              @Resource(mappedName = "XAConnectionFactory")
              private XAConnectionFactory connectionFactory;
              
              @Resource(mappedName = "queue/A")
              private Queue queue;
              
              public void sendMessage() {
                XAConnection connection = connectionFactory.createXAConnection();
                XASession session = connection.createXASession();
              
                MessageProducer producer = session.createProducer(queue);
              
                log.info("sending message...");
                producer.send(session.createTextMessage("test message"));
              }
              



              MDB:


              @PersistenceContext
              private EntityManager em;
              
              public void onMessage(Message msg) {
                 try {
                    TextMessage textMessage = (TextMessage) msg;
                    log.info("*** JMSXDeliveryCount: "
                                  + msg.getIntProperty("JMSXDeliveryCount"));
                    log.info("*** JMS_JBOSS_REDELIVERY_COUNT: "
                                  + msg.getIntProperty("JMS_JBOSS_REDELIVERY_COUNT"));
              
                      String message = textMessage.getText();
                      log.info("*** got message " + message);
              
                      User user = new User();
                      user.setName("test");
                      user.setPassword("password");
                      user.setUsername("test");
                      em.persist(user);
              
                      log.info("saved user");
                  } catch (JMSException jmse) {
                      throw new RuntimeException(jmse);
                  }
              }
              

              1 2 Previous Next