1 2 Previous Next 21 Replies Latest reply on Sep 6, 2006 3:43 AM by mskonda Go to original post
      • 15. Re: XARecovery - MQ/Messaging merge

        TransactionRepositroy.java

        public List getPreparedTransactions()
         {
         ArrayList prepared = new ArrayList();
        
         log.info("Prepared Transaction in gloabalToLocalMap"+globalToLocalMap);
        
         Iterator iter = globalToLocalMap.values().iterator();
         while (iter.hasNext())
         {
         Transaction tx = (Transaction)iter.next();
         if (tx.xid != null && tx.getState() == Transaction.STATE_PREPARED)
         {
         prepared.add(tx.getXid());
         }
         }
         return prepared;
         }
        
         /*
         * Load any prepared transactions into the repository so they can be recovered
         */
         public void loadPreparedTransactions() throws Exception
         {
         List prepared = null;
        
         prepared = persistenceManager.retrievePreparedTransactions();
        
         if (prepared != null)
         {
         Iterator iter = prepared.iterator();
        
         while (iter.hasNext())
         {
         PreparedTxInfo txInfo = (PreparedTxInfo)iter.next();
        
         Transaction tx = createTransaction(txInfo);
         tx.state = Transaction.STATE_PREPARED;
        
         // we have to associate a callback to the prepared transaction
         // --MK
         persistenceManager.associateCallbackToPreparedTx(tx);
        
         //Load the references for this transaction
         }
         }
         }
        
         /**
         * Creates a prepared transaction
         * @param txInfo
         * @return
         * @throws Exception
         */
         private Transaction createTransaction(PreparedTxInfo txInfo) throws Exception {
         System.out.println("PreparedTxInfo:Xid "+txInfo.getTxId());
         if (globalToLocalMap.containsKey(txInfo.getXid()))
         {
         throw new TransactionException("There is already a local tx for global tx " + txInfo.getXid());
         }
        
         //Resurrected tx
         Transaction tx = new Transaction(txInfo.getTxId(), txInfo.getXid());
        
         if (trace) { log.trace("created transaction " + tx); }
        
         globalToLocalMap.put(txInfo.getXid(), tx);
        
         return tx;
        }
        
        public Transaction getPreparedTx(Xid xid) throws Exception
         {
         // the incoming Xid from Arjuna's RM is funny formatted!
         // hence we have to do our way
         // I have to look into this carefully a bit later.
        
         Xid x = new XidImpl(xid.getBranchQualifier(),xid.getFormatId(), xid.getGlobalTransactionId());
        
         Transaction tx = (Transaction)globalToLocalMap.get(x);
         if (tx == null)
         {
         throw new TransactionException("Cannot find local tx for xid:" + xid);
         }
         if (tx.getState() != Transaction.STATE_PREPARED)
         {
         throw new TransactionException("Transaction with xid " + xid + " is not in prepared state");
         }
         return tx;
         }
        

        JDBCPersistenceManager.java
        public void associateCallbackToPreparedTx(Transaction tx)
         {
         TransactionCallback callback = (TransactionCallback) tx.getKeyedCallback(this);
        
         if (callback == null)
         {
         callback = new TransactionCallback(tx);
        
         tx.addKeyedCallback(callback, this);
         }
         }
        
         public List retrievePreparedTransactions() throws Exception
         {
         Connection conn = null;
         Statement st = null;
         ResultSet rs = null;
         PreparedTxInfo txInfo = null;
         TransactionWrapper wrap = new TransactionWrapper();
        
         try
         {
         List transactions = new ArrayList();
        
         conn = ds.getConnection();
        
         st = conn.createStatement();
         rs = st.executeQuery(selectPreparedTransactions);
        
         while (rs.next())
         {
         //get the exiting tx id --MK START
         long txId = rs.getLong(1);
         byte[] branchQual = rs.getBytes(2);
         int formatId = rs.getInt(3);
         byte[] globalTxId = rs.getBytes(4);
         Xid xid = new XidImpl(branchQual, formatId, globalTxId);
        
         // create a tx info object with the result set detailsdetails
         txInfo = new PreparedTxInfo(txId, xid);
         transactions.add(txInfo);
         }
        
         return transactions;
        
         }


        • 16. Re: XARecovery - MQ/Messaging merge

           

          "timfox" wrote:
          When you load the prepared transactions from the database, you need to "reconstitute" them in exactly the same state they were in after the prepare was originally executed.

          This means loading all the messages and acks into the InMemoryCallback. Otherwise when you call commit(), nothing will get delivered or acknowledged in memory since you haven't loaded the data.

          You also need to create a TxCallback (which I see you have already done) - the TXCallback doesn't need to have data populated in it, since it's data was already persisted when prepare was completed.


          So, that's the reason why the message isn't delivered!!!! Would you be able to give me hand in creating that callback?

          • 17. Re: XARecovery - MQ/Messaging merge
            timfox

             

            "mskonda" wrote:

            So, that's the reason why the message isn't delivered!!!! Would you be able to give me hand in creating that callback?


            Take a look at ChannelSupport::InMemoyrCallback

            You basically want to do something very similar - you may even be able to use this exact class

            • 18. Re: XARecovery - MQ/Messaging merge

              yup, just lookint at it!

              • 19. Re: XARecovery - MQ/Messaging merge

                 

                "timfox" wrote:

                This means loading all the messages and acks into the InMemoryCallback.

                How do you find the acks Tim?

                • 20. Re: XARecovery - MQ/Messaging merge
                  timfox

                  messages:

                  select ... from jms_message_reference where transactionid=? and state="+"
                  


                  acks:

                  select ... from jms_message_reference where transactionid=? and state="-"
                  


                  • 21. Re: XARecovery - MQ/Messaging merge

                     

                    "timfox" wrote:

                    This means loading all the messages and acks into the InMemoryCallback. Otherwise when you call commit(), nothing will get delivered or acknowledged in memory since you haven't loaded the data.

                    Can you explain the semantics of InMemoryCallback class as to what happens when a commit is called on the transaction (provided the tx has associated InMemoryCallback)?

                    The current one in ChannelSupport depends on the ChannelSupport for delivery and other functionality. Hence I factored out the class and created it as an inner class in TransactionRepository (so the reconstitued tx will be immediately associated with the InMemoryCallaback).

                    Also can you brief me as to what the 'Loaded' column indicate in the Message Reference table and how it is mainupulated? When do we have to upated the column?

                    Thanks Tim


                    1 2 Previous Next