This content has been marked as final.
Show 21 replies
-
15. Re: XARecovery - MQ/Messaging merge
mskonda Sep 5, 2006 11:31 AM (in response to mskonda)TransactionRepositroy.java
public List getPreparedTransactions() { ArrayList prepared = new ArrayList(); log.info("Prepared Transaction in gloabalToLocalMap"+globalToLocalMap); Iterator iter = globalToLocalMap.values().iterator(); while (iter.hasNext()) { Transaction tx = (Transaction)iter.next(); if (tx.xid != null && tx.getState() == Transaction.STATE_PREPARED) { prepared.add(tx.getXid()); } } return prepared; } /* * Load any prepared transactions into the repository so they can be recovered */ public void loadPreparedTransactions() throws Exception { List prepared = null; prepared = persistenceManager.retrievePreparedTransactions(); if (prepared != null) { Iterator iter = prepared.iterator(); while (iter.hasNext()) { PreparedTxInfo txInfo = (PreparedTxInfo)iter.next(); Transaction tx = createTransaction(txInfo); tx.state = Transaction.STATE_PREPARED; // we have to associate a callback to the prepared transaction // --MK persistenceManager.associateCallbackToPreparedTx(tx); //Load the references for this transaction } } } /** * Creates a prepared transaction * @param txInfo * @return * @throws Exception */ private Transaction createTransaction(PreparedTxInfo txInfo) throws Exception { System.out.println("PreparedTxInfo:Xid "+txInfo.getTxId()); if (globalToLocalMap.containsKey(txInfo.getXid())) { throw new TransactionException("There is already a local tx for global tx " + txInfo.getXid()); } //Resurrected tx Transaction tx = new Transaction(txInfo.getTxId(), txInfo.getXid()); if (trace) { log.trace("created transaction " + tx); } globalToLocalMap.put(txInfo.getXid(), tx); return tx; } public Transaction getPreparedTx(Xid xid) throws Exception { // the incoming Xid from Arjuna's RM is funny formatted! // hence we have to do our way // I have to look into this carefully a bit later. Xid x = new XidImpl(xid.getBranchQualifier(),xid.getFormatId(), xid.getGlobalTransactionId()); Transaction tx = (Transaction)globalToLocalMap.get(x); if (tx == null) { throw new TransactionException("Cannot find local tx for xid:" + xid); } if (tx.getState() != Transaction.STATE_PREPARED) { throw new TransactionException("Transaction with xid " + xid + " is not in prepared state"); } return tx; }
JDBCPersistenceManager.javapublic void associateCallbackToPreparedTx(Transaction tx) { TransactionCallback callback = (TransactionCallback) tx.getKeyedCallback(this); if (callback == null) { callback = new TransactionCallback(tx); tx.addKeyedCallback(callback, this); } } public List retrievePreparedTransactions() throws Exception { Connection conn = null; Statement st = null; ResultSet rs = null; PreparedTxInfo txInfo = null; TransactionWrapper wrap = new TransactionWrapper(); try { List transactions = new ArrayList(); conn = ds.getConnection(); st = conn.createStatement(); rs = st.executeQuery(selectPreparedTransactions); while (rs.next()) { //get the exiting tx id --MK START long txId = rs.getLong(1); byte[] branchQual = rs.getBytes(2); int formatId = rs.getInt(3); byte[] globalTxId = rs.getBytes(4); Xid xid = new XidImpl(branchQual, formatId, globalTxId); // create a tx info object with the result set detailsdetails txInfo = new PreparedTxInfo(txId, xid); transactions.add(txInfo); } return transactions; }
-
16. Re: XARecovery - MQ/Messaging merge
mskonda Sep 5, 2006 11:34 AM (in response to mskonda)"timfox" wrote:
When you load the prepared transactions from the database, you need to "reconstitute" them in exactly the same state they were in after the prepare was originally executed.
This means loading all the messages and acks into the InMemoryCallback. Otherwise when you call commit(), nothing will get delivered or acknowledged in memory since you haven't loaded the data.
You also need to create a TxCallback (which I see you have already done) - the TXCallback doesn't need to have data populated in it, since it's data was already persisted when prepare was completed.
So, that's the reason why the message isn't delivered!!!! Would you be able to give me hand in creating that callback? -
17. Re: XARecovery - MQ/Messaging merge
timfox Sep 5, 2006 11:43 AM (in response to mskonda)"mskonda" wrote:
So, that's the reason why the message isn't delivered!!!! Would you be able to give me hand in creating that callback?
Take a look at ChannelSupport::InMemoyrCallback
You basically want to do something very similar - you may even be able to use this exact class -
18. Re: XARecovery - MQ/Messaging merge
mskonda Sep 5, 2006 11:47 AM (in response to mskonda)yup, just lookint at it!
-
19. Re: XARecovery - MQ/Messaging merge
mskonda Sep 5, 2006 12:20 PM (in response to mskonda)"timfox" wrote:
This means loading all the messages and acks into the InMemoryCallback.
How do you find the acks Tim? -
20. Re: XARecovery - MQ/Messaging merge
timfox Sep 5, 2006 12:26 PM (in response to mskonda)messages:
select ... from jms_message_reference where transactionid=? and state="+"
acks:select ... from jms_message_reference where transactionid=? and state="-"
-
21. Re: XARecovery - MQ/Messaging merge
mskonda Sep 6, 2006 3:43 AM (in response to mskonda)"timfox" wrote:
This means loading all the messages and acks into the InMemoryCallback. Otherwise when you call commit(), nothing will get delivered or acknowledged in memory since you haven't loaded the data.
Can you explain the semantics of InMemoryCallback class as to what happens when a commit is called on the transaction (provided the tx has associated InMemoryCallback)?
The current one in ChannelSupport depends on the ChannelSupport for delivery and other functionality. Hence I factored out the class and created it as an inner class in TransactionRepository (so the reconstitued tx will be immediately associated with the InMemoryCallaback).
Also can you brief me as to what the 'Loaded' column indicate in the Message Reference table and how it is mainupulated? When do we have to upated the column?
Thanks Tim