5 Replies Latest reply on May 31, 2006 6:16 AM by timfox

    XA transactional issue using two resources

      Looks like I have an issue with JBoss messaging:

      I am using three resources ? one JBossMessaging XA Resource, two SybaseXAResource and three dummy XAResource.

      My test client starts a transaction
      Does the Sybase Work (insert into a table)
      Sends a message (jms send)

      Prepares the resources (I can see the state of the transaction in Sybase log as PREPARED)
      Just before committing, I kill the test client (as it crashes the Transaction Manager) (I tried killing the JBossAS too)

      I expect the client?s work to be withheld. Unfortunately, the message is SENT and is consumed by a jms listener.

      So the outcome of this situation is ? Sybase leaves the transaction in prepared state while messaging has committed. Unless I misunderstand, things shouldn?t work this way.

      Any pointers appreciated.

      Madhu

        • 1. Re: XA transactional issue using two resources
          timfox

          please post your code

          • 2. Re: XA transactional issue using two resources

             

            public void init()
            {
             try
             {
             initialContext = new InitialContext();
             cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
             jbossCf = (JBossConnectionFactory) initialContext.lookup("ConnectionFactory");
            
             topicDestn = (Destination) initialContext.lookup("/topic/testTopic");
             ut = (UserTransaction) initialContext.lookup("UserTransaction");
             } catch (Exception e) {
             e.printStackTrace();
             }
            
            }
            public void testXA() throws Exception
            {
             XAConnection conn = null;
             javax.jms.Connection conn2 = null;
            
             init();
             conn = jbossCf.createXAConnection();
            
             try
             {
            
             UserTransaction userTx = com.arjuna.ats.jta.UserTransaction.userTransaction();
            
             userTx.begin();
            
            // javax.transaction.TransactionManager txMgr = TransactionManager.transactionManager();
            // txMgr.begin();
            
             XASession sess1 = conn.createXASession();
            
            // messaging resource
             org.jboss.jms.tx.MessagingXAResource msgRes = (org.jboss.jms.tx.MessagingXAResource) sess1.getXAResource();
             msgRes.setPreventJoining(true);
            
            // dummy resource
             XAResource dummyRes = new DummyXAResource(true);
            
            // sybase res
             SybaseXAResource sybaseRes = new SybaseXAResource();
             XAResource sybRes = sybaseRes.getXAResource();
            
            // Transaction tx = txMgr.getTransaction();
            
            // static transaction
             Transaction tx= TransactionManager.transactionManager().getTransaction();
            
            // enlististing the resoruces
             tx.enlistResource(msgRes);
             tx.enlistResource(dummyRes);
             tx.enlistResource(sybRes);
            
            // ===================== Sybase works start ===============================
             System.out.println("=========== Sybase XA Work START =========");
             javax.sql.XAConnection sybaseXaConn = sybaseRes.getXAConnection();
             java.sql.Connection sybaseConn = sybaseXaConn.getConnection();
             java.sql.Statement stmt = sybaseConn.createStatement();
            
             int i = stmt.executeUpdate("INSERT INTO GRINDER_MESSAGES VALUES('"
             + System.currentTimeMillis() + "-XXXX','"
             + new java.util.Random().nextInt() + "','"
             + new java.util.Random().nextInt() + "')");
            
             System.out.println("Rows affected: " + i);
             System.out.println("=========== Sybase XA Work END =========");
            
            // ===================== Sybase works end ==============================
            
            // ===================== Messaging works start ==============================
            
             MessageProducer sender = sess1.createProducer(topicDestn);
            // sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
             javax.jms.Message tm = sess1.createTextMessage("test-"+new java.util.Random().nextInt());
            
             System.out.println("Sending the message");
             sender.send(tm);
            
             try {
             System.out.println("Toplevel commit");
             tx.commit();
             } catch (Exception e) {
             // don't bother
             }
            
             System.out.println("Exiting..");
             System.exit(0);
            
             } finally {
             if (conn != null) {
             conn.close();
             }
             if (conn2 != null) {
             conn2.close();
             }
             }
            
            
            }
            static class DummyXAResource implements XAResource
            {
             boolean failOnPrepare;
            
             DummyXAResource()
             {
             }
            
             DummyXAResource(boolean failOnPrepare)
             {
             this.failOnPrepare = failOnPrepare;
             }
            
             public void commit(Xid arg0, boolean arg1) throws XAException
             {
             System.out.println("Dummyresoruce: commit ");
             System.out.println("I am about to commit..check the logs");
            
             try {
             Thread.sleep(30000);
             } catch (InterruptedException e) {
             e.printStackTrace();
             }
             System.out.println("I am awake now.");
             }
            
             public void end(Xid arg0, int arg1) throws XAException{ }
            
             public void forget(Xid arg0) throws XAException { }
            
             public int getTransactionTimeout() throws XAException
             {
             return 0;
             }
            
             public boolean isSameRM(XAResource arg0) throws XAException
             {
             return false;
             }
             public int prepare(Xid arg0) throws XAException
             {
             System.out.println("Dummyresoruce: prepare");
             if (failOnPrepare)
             {
             throw new XAException(XAException.XAER_RMERR);
             }
             return XAResource.XA_OK;
             }
            
             public Xid[] recover(int arg0) throws XAException
             {
             return null;
             }
            
             public void rollback(Xid arg0) throws XAException
             {
             }
            
             public boolean setTransactionTimeout(int arg0) throws XAException
             {
             return false;
             }
            
             public void start(Xid arg0, int arg1) throws XAException
             {
            
             }
            
            }
            


            • 3. Re: XA transactional issue using two resources
              timfox

              Madhu-

              I'm not sure what you're trying to achieve here.

              If you want to see some working XA examples then have a look at org.jboss.test.messaging.jms.XATest

              • 4. Re: XA transactional issue using two resources
                timfox

                I think XATest.test2PCSendRollback() does basically what you are attempting to do (and passes).

                • 5. Re: XA transactional issue using two resources
                  timfox

                  Here's a test I've just put together.

                  It addss a bunch of dummy XAResources to the tx, as well as the XASession XAResource.

                  On prepare one of the dummy XAResources fails, causing rollback to happen, and the message *isn't* sent.

                  As far as I can tell this is basically what you're trying to do.

                  The test runs passes, and no message is sent.

                  public void test2PCSendFailOnPrepare() throws Exception
                   {
                   if (ServerManagement.isRemote()) return;
                  
                   XAConnection conn = null;
                   Connection conn2 = null;
                   try
                   {
                   conn = cf.createXAConnection();
                  
                   tm.begin();
                  
                   XASession sess = conn.createXASession();
                   MessagingXAResource res = (MessagingXAResource)sess.getXAResource();
                  
                   //prevent 1Pc optimisation
                   res.setPreventJoining(true);
                  
                   XAResource res2 = new DummyXAResource(true);
                   XAResource res3 = new DummyXAResource();
                   XAResource res4 = new DummyXAResource();
                  
                   Transaction tx = tm.getTransaction();
                   tx.enlistResource(res);
                   tx.enlistResource(res2);
                   tx.enlistResource(res3);
                   tx.enlistResource(res4);
                  
                   MessageProducer prod = sess.createProducer(queue);
                   prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                   Message m = sess.createTextMessage("XATest1");
                   prod.send(queue, m);
                   m = sess.createTextMessage("XATest2");
                   prod.send(queue, m);
                  
                   try
                   {
                   tx.commit();
                   }
                   catch (Exception e)
                   {
                   //We should expect this
                   }
                  
                   conn2 = cf.createConnection();
                   conn2.start();
                   Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
                   MessageConsumer cons = sessReceiver.createConsumer(queue);
                   Message m2 = cons.receive(1000);
                   assertNull(m2);
                  
                   }
                   finally
                   {
                   if (conn != null)
                   {
                   conn.close();
                   }
                   if (conn2 != null)
                   {
                   conn2.close();
                   }
                   }
                   }
                  
                  static class DummyXAResource implements XAResource
                   {
                   boolean failOnPrepare;
                  
                   DummyXAResource()
                   {
                   }
                  
                   DummyXAResource(boolean failOnPrepare)
                   {
                   this.failOnPrepare = failOnPrepare;
                   }
                  
                   public void commit(Xid arg0, boolean arg1) throws XAException
                   {
                   }
                  
                   public void end(Xid arg0, int arg1) throws XAException
                   {
                   }
                  
                   public void forget(Xid arg0) throws XAException
                   {
                   }
                  
                   public int getTransactionTimeout() throws XAException
                   {
                   return 0;
                   }
                  
                   public boolean isSameRM(XAResource arg0) throws XAException
                   {
                   return false;
                   }
                  
                   public int prepare(Xid arg0) throws XAException
                   {
                   if (failOnPrepare)
                   {
                   throw new XAException(XAException.XAER_RMERR);
                   }
                   return XAResource.XA_OK;
                   }
                  
                   public Xid[] recover(int arg0) throws XAException
                   {
                   return null;
                   }
                  
                   public void rollback(Xid arg0) throws XAException
                   {
                   }
                  
                   public boolean setTransactionTimeout(int arg0) throws XAException
                   {
                   return false;
                   }
                  
                   public void start(Xid arg0, int arg1) throws XAException
                   {
                  
                   }
                  
                   }