Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 1,195   Methods: 34
NCLOC: 855   Classes: 3
 
 Source file Conditionals Statements Methods TOTAL
TxInterceptor.java 67.2% 82% 97.1% 78.3%
coverage coverage
 1    /*
 2    * JBoss, Home of Professional Open Source
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7    package org.jboss.cache.interceptors;
 8   
 9    import org.jboss.cache.CacheException;
 10    import org.jboss.cache.CacheSPI;
 11    import org.jboss.cache.InvocationContext;
 12    import org.jboss.cache.ReplicationException;
 13    import org.jboss.cache.config.Configuration;
 14    import org.jboss.cache.config.Option;
 15    import org.jboss.cache.marshall.MethodCall;
 16    import org.jboss.cache.marshall.MethodCallFactory;
 17    import org.jboss.cache.marshall.MethodDeclarations;
 18    import org.jboss.cache.optimistic.DataVersion;
 19    import org.jboss.cache.transaction.GlobalTransaction;
 20    import org.jboss.cache.transaction.OptimisticTransactionEntry;
 21    import org.jboss.cache.transaction.TransactionEntry;
 22   
 23    import javax.transaction.Status;
 24    import javax.transaction.Synchronization;
 25    import javax.transaction.SystemException;
 26    import javax.transaction.Transaction;
 27    import java.util.HashMap;
 28    import java.util.List;
 29    import java.util.Map;
 30    import java.util.concurrent.ConcurrentHashMap;
 31   
 32    /**
 33    * This interceptor is the new default at the head of all interceptor chains,
 34    * and makes transactional attributes available to all interceptors in the chain.
 35    * This interceptor is also responsible for registering for synchronisation on
 36    * transaction completion.
 37    *
 38    * @author <a href="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
 39    * @author <a href="mailto:stevew@jofti.com">Steve Woodcock (stevew@jofti.com)</a>
 40    */
 41    public class TxInterceptor extends BaseTransactionalContextInterceptor implements TxInterceptorMBean
 42    {
 43    private final static Object NULL = new Object();
 44   
 45    /**
 46    * List <Transaction>that we have registered for
 47    */
 48    private Map transactions = new ConcurrentHashMap(16);
 49    private Map rollbackTransactions = new ConcurrentHashMap(16);
 50    private long m_prepares = 0;
 51    private long m_commits = 0;
 52    private long m_rollbacks = 0;
 53   
 54   
 55    /**
 56    * Set<GlobalTransaction> of GlobalTransactions that originated somewhere else (we didn't create them).
 57    * This is a result of a PREPARE phase. GlobalTransactions in this list should be ignored by this
 58    * interceptor when registering for TX completion
 59    */
 60    private Map remoteTransactions = new ConcurrentHashMap();
 61   
 62  2485388 public Object invoke(InvocationContext ctx) throws Throwable
 63    {
 64  2485388 MethodCall m = ctx.getMethodCall();
 65  2485388 if (log.isTraceEnabled())
 66    {
 67  0 log.trace("(" + cache.getLocalAddress() + ") call on method [" + m + "]");
 68    }
 69    // bypass for buddy group org metod calls.
 70  789 if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(ctx);
 71   
 72  2484599 boolean scrubTxsOnExit = false;
 73  2484599 Option optionOverride = ctx.getOptionOverrides();
 74   
 75  2484599 Object result = null;
 76   
 77  2484599 try
 78    {
 79    // first of all deal with tx methods - these are only going to be
 80    // prepare/commit/rollback called by a remote cache, since calling
 81    // such methods on CacheImpl directly would fail.
 82   
 83  2484599 if (MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId()))
 84    {
 85    // this is a prepare, commit, or rollback.
 86  0 if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " + ctx.getGlobalTransaction());
 87   
 88  1473 if (ctx.getGlobalTransaction().isRemote()) remoteTransactions.put(ctx.getGlobalTransaction(), NULL);
 89   
 90  1473 switch (m.getMethodId())
 91    {
 92  179 case MethodDeclarations.optimisticPrepareMethod_id:
 93  570 case MethodDeclarations.prepareMethod_id:
 94  749 if (ctx.getGlobalTransaction().isRemote())
 95    {
 96  749 result = handleRemotePrepare(ctx, m);
 97  726 scrubTxsOnExit = true;
 98  726 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
 99    {
 100  715 m_prepares++;
 101    }
 102    }
 103    else
 104    {
 105  0 if (log.isTraceEnabled()) log.trace("received my own message (discarding it)");
 106  0 result = null;
 107    }
 108  726 break;
 109  696 case MethodDeclarations.commitMethod_id:
 110  28 case MethodDeclarations.rollbackMethod_id:
 111  724 if (ctx.getGlobalTransaction().isRemote())
 112    {
 113  724 result = handleRemoteCommitRollback(ctx);
 114  721 scrubTxsOnExit = true;
 115    }
 116    else
 117    {
 118  0 if (log.isTraceEnabled()) log.trace("received my own message (discarding it)");
 119  0 result = null;
 120    }
 121  721 break;
 122    }
 123    }
 124    else
 125    {
 126    // non-transaction lifecycle method.
 127  2483126 result = handleNonTxMethod(ctx);
 128    }
 129    }
 130    catch (Exception e)
 131    {
 132  70 if (optionOverride == null || !optionOverride.isFailSilently()) throw e;
 133  11 log.trace("There was a problem handling this request, but " +
 134    "failSilently was set, so suppressing exception", e);
 135    }
 136    finally
 137    {
 138    // we should scrub txs after every call to prevent race conditions
 139    // basically any other call coming in on the same thread and hijacking any running tx's
 140    // was highlighted in JBCACHE-606
 141   
 142  2484598 if (scrubTxsOnExit)
 143    {
 144  1447 setTransactionalContext(null, null, ctx);
 145    }
 146    }
 147  2484528 return result;
 148    }
 149   
 150  6 public long getPrepares()
 151    {
 152  6 return m_prepares;
 153    }
 154   
 155  6 public long getCommits()
 156    {
 157  6 return m_commits;
 158    }
 159   
 160  6 public long getRollbacks()
 161    {
 162  6 return m_rollbacks;
 163    }
 164   
 165  2 public void resetStatistics()
 166    {
 167  2 m_prepares = 0;
 168  2 m_commits = 0;
 169  2 m_rollbacks = 0;
 170    }
 171   
 172  0 public Map<String, Object> dumpStatistics()
 173    {
 174  0 Map<String, Object> retval = new HashMap<String, Object>(3);
 175  0 retval.put("Prepares", m_prepares);
 176  0 retval.put("Commits", m_commits);
 177  0 retval.put("Rollbacks", m_rollbacks);
 178  0 return retval;
 179    }
 180   
 181  749 private Object handleRemotePrepare(InvocationContext ctx, MethodCall m) throws Throwable
 182    {
 183  749 GlobalTransaction gtx = ctx.getGlobalTransaction();
 184  749 List<MethodCall> modifications = (List<MethodCall>) m.getArgs()[1];
 185  749 boolean onePhase = (Boolean) m.getArgs()[configuration.isNodeLockingOptimistic() ? 4 : 3];
 186   
 187    // Is there a local transaction associated with GTX ?
 188  749 Transaction ltx = txTable.getLocalTransaction(gtx);
 189   
 190  749 Transaction currentTx = txManager.getTransaction();
 191  749 Object retval = null;
 192   
 193  749 try
 194    {
 195  749 if (ltx == null)
 196    {
 197  3 if (currentTx != null) txManager.suspend();
 198  749 ltx = createLocalTxForGlobalTx(gtx, ctx);// creates new LTX and associates it with a GTX
 199  749 if (log.isDebugEnabled())
 200    {
 201  0 log.debug("Started new local TX as result of remote PREPARE: local TX=" + ltx + " (Status=" + ltx.getStatus() + "), global TX=" + gtx);
 202    }
 203    }
 204    else
 205    {
 206    //this should be valid
 207  0 if (!isValid(ltx)) throw new CacheException("Transaction " + ltx + " not in correct state to be prepared");
 208   
 209    //associate this thread with this ltx if this ltx is NOT the current tx.
 210  0 if (currentTx == null || !ltx.equals(currentTx))
 211    {
 212  0 txManager.suspend();
 213  0 txManager.resume(ltx);
 214    }
 215    }
 216   
 217   
 218  749 if (log.isTraceEnabled())
 219    {
 220  0 log.trace("Resuming existing transaction " + ltx + ", global TX=" + gtx);
 221    }
 222   
 223    // at this point we have a non-null ltx
 224   
 225    // Asssociate the local TX with the global TX. Create new
 226    // entry for TX in txTable, the modifications
 227    // below will need this entry to add their modifications
 228    // under the GlobalTx key
 229  749 if (txTable.get(gtx) == null)
 230    {
 231    // create a new transaction entry
 232   
 233  749 TransactionEntry entry = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry() : new TransactionEntry();
 234  749 entry.setTransaction(ltx);
 235  749 log.debug("creating new tx entry");
 236  749 txTable.put(gtx, entry);
 237  0 if (log.isTraceEnabled()) log.trace("TxTable contents: " + txTable);
 238    }
 239   
 240  749 setTransactionalContext(ltx, gtx, ctx);
 241    // register a sync handler for this tx.
 242  749 registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, cache), ctx);
 243   
 244  749 if (configuration.isNodeLockingOptimistic())
 245    {
 246  179 retval = handleOptimisticPrepare(ctx, gtx, modifications, onePhase, ltx);
 247    }
 248    else
 249    {
 250  570 retval = handlePessimisticPrepare(ctx, m, gtx, modifications, onePhase, ltx);
 251    }
 252    }
 253    finally
 254    {
 255  749 txManager.suspend();// suspends ltx - could be null
 256    // resume whatever else we had going.
 257  3 if (currentTx != null) txManager.resume(currentTx);
 258  0 if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
 259    }
 260   
 261  726 return retval;
 262    }
 263   
 264    // --------------------------------------------------------------
 265    // handler methods.
 266    // --------------------------------------------------------------
 267   
 268    /**
 269    * Tests if we already have a tx running. If so, register a sync handler for this method invocation.
 270    * if not, create a local tx if we're using opt locking.
 271    *
 272    * @return
 273    * @throws Throwable
 274    */
 275  2483126 private Object handleNonTxMethod(InvocationContext ctx) throws Throwable
 276    {
 277  2483126 MethodCall m = ctx.getMethodCall();
 278  2483126 Transaction tx = ctx.getTransaction();
 279  2483126 Object result;
 280    // if there is no current tx and we're using opt locking, we need to use an implicit tx.
 281  2483126 boolean implicitTransaction = configuration.isNodeLockingOptimistic() && tx == null;
 282  2483126 if (implicitTransaction)
 283    {
 284  3297 tx = createLocalTx();
 285    // we need to attach this tx to the InvocationContext.
 286  3297 ctx.setTransaction(tx);
 287    }
 288  1401906 if (tx != null) m = attachGlobalTransaction(ctx, tx, m);
 289   
 290  2483126 GlobalTransaction gtx = ctx.getGlobalTransaction();
 291   
 292  2483126 try
 293    {
 294  2483126 result = super.invoke(ctx);
 295  2483070 if (implicitTransaction)
 296    {
 297  3297 copyInvocationScopeOptionsToTxScope(ctx);
 298  3297 txManager.commit();
 299    }
 300    }
 301    catch (Throwable t)
 302    {
 303  65 if (implicitTransaction)
 304    {
 305  10 log.warn("Rolling back, exception encountered", t);
 306  10 result = t;
 307  10 try
 308    {
 309  10 setTransactionalContext(tx, gtx, ctx);
 310  10 txManager.rollback();
 311    }
 312    catch (Throwable th)
 313    {
 314  10 log.warn("Roll back failed encountered", th);
 315    }
 316    }
 317    else
 318    {
 319  55 throw t;
 320    }
 321    }
 322  2483070 return result;
 323    }
 324   
 325  1401906 private MethodCall attachGlobalTransaction(InvocationContext ctx, Transaction tx, MethodCall m) throws Exception
 326    {
 327  1401906 if (log.isDebugEnabled())
 328    {
 329  0 log.debug(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
 330    }
 331  1401905 if (log.isTraceEnabled())
 332    {
 333  0 GlobalTransaction tempGtx = txTable.get(tx);
 334  0 log.trace("Associated gtx in txTable is " + tempGtx);
 335    }
 336   
 337    // register a sync handler for this tx - only if the gtx is not remotely initiated.
 338  1401906 GlobalTransaction gtx = registerTransaction(tx, ctx);
 339  1401906 if (gtx != null)
 340    {
 341  1121124 m = replaceGtx(m, gtx);
 342    }
 343    else
 344    {
 345    // get the current gtx from the txTable.
 346  280782 gtx = txTable.get(tx);
 347    }
 348   
 349    // make sure we attach this gtx to the invocation context.
 350  1401906 ctx.setGlobalTransaction(gtx);
 351   
 352  1401906 return m;
 353    }
 354   
 355    /**
 356    * This is called by invoke() if we are in a remote gtx's prepare() phase.
 357    * Finds the appropriate tx, suspends any existing txs, registers a sync handler
 358    * and passes up the chain.
 359    * <p/>
 360    * Resumes any existing txs before returning.
 361    *
 362    * @return
 363    * @throws Throwable
 364    */
 365  179 private Object handleOptimisticPrepare(InvocationContext ctx, GlobalTransaction gtx, List<MethodCall> modifications, boolean onePhase, Transaction ltx) throws Throwable
 366    {
 367  179 Object retval;
 368  0 if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + gtx);
 369  179 replayModifications(modifications, ctx, true);
 370  179 retval = super.invoke(ctx);
 371    // JBCACHE-361 Confirm that the transaction is ACTIVE
 372  178 if (!isActive(ltx))
 373    {
 374  0 throw new ReplicationException("prepare() failed -- " +
 375    "local transaction status is not STATUS_ACTIVE;" +
 376    " is " + ltx.getStatus());
 377    }
 378  178 return retval;
 379    }
 380   
 381  570 private Object handlePessimisticPrepare(InvocationContext ctx, MethodCall m, GlobalTransaction gtx, List<MethodCall> modifications, boolean commit, Transaction ltx) throws Exception
 382    {
 383  570 boolean success = true;
 384  570 Object retval;
 385  570 try
 386    {
 387    // now pass up the prepare method itself.
 388  570 try
 389    {
 390  570 replayModifications(modifications, ctx, false);
 391  548 if (isOnePhaseCommitPrepareMehod(m))
 392    {
 393  19 log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
 394    }
 395    else
 396    {
 397  529 super.invoke(ctx);
 398    }
 399   
 400    // JBCACHE-361 Confirm that the transaction is ACTIVE
 401  548 if (!isActive(ltx))
 402    {
 403  0 throw new ReplicationException("prepare() failed -- " +
 404    "local transaction status is not STATUS_ACTIVE;" +
 405    " is " + ltx.getStatus());
 406    }
 407    }
 408    catch (Throwable th)
 409    {
 410  22 log.error("prepare method invocation failed", th);
 411  22 retval = th;
 412  22 success = false;
 413  22 if (retval instanceof Exception)
 414    {
 415  22 throw (Exception) retval;
 416    }
 417    }
 418    }
 419    finally
 420    {
 421   
 422  570 if (log.isTraceEnabled())
 423    {
 424  0 log.trace("Are we running a 1-phase commit? " + commit);
 425    }
 426    // 4. If commit == true (one-phase-commit): commit (or rollback) the TX; this will cause
 427    // {before/after}Completion() to be called in all registered interceptors: the TransactionInterceptor
 428    // will then commit/rollback against the cache
 429   
 430  570 if (commit)
 431    {
 432  19 try
 433    {
 434    // invokeOnePhaseCommitMethod(gtx, modifications.size() > 0, success);
 435  19 if (success)
 436    {
 437  19 ltx.commit();
 438    }
 439    else
 440    {
 441  0 ltx.rollback();
 442    }
 443    }
 444    catch (Throwable t)
 445    {
 446  0 log.error("Commit/rollback failed.", t);
 447  0 if (success)
 448    {
 449    // try another rollback...
 450  0 try
 451    {
 452  0 log.info("Attempting anotehr rollback");
 453    //invokeOnePhaseCommitMethod(gtx, modifications.size() > 0, false);
 454  0 ltx.rollback();
 455    }
 456    catch (Throwable t2)
 457    {
 458  0 log.error("Unable to rollback", t2);
 459    }
 460    }
 461    }
 462    finally
 463    {
 464  19 transactions.remove(ltx);// JBAS-298
 465  19 remoteTransactions.remove(gtx);// JBAS-308
 466    }
 467    }
 468    }
 469  548 return null;
 470    }
 471   
 472  749 private Object replayModifications(List<MethodCall> modifications, InvocationContext ctx, boolean injectDataVersions)
 473    {
 474  749 Object retval = null;
 475  749 MethodCall originalMethodCall = ctx.getMethodCall();
 476  749 Option originalOption = ctx.getOptionOverrides();
 477   
 478  749 if (modifications != null)
 479    {
 480  749 for (MethodCall modification : modifications)
 481    {
 482  4107 try
 483    {
 484  4107 if (injectDataVersions && !MethodDeclarations.isDataGravitationMethod(modification.getMethodId()))
 485    {
 486  197 Object[] origArgs = modification.getArgs();
 487    // there may be instances (e.g., data gravitation calls) where a data version is not passed in or not even relevant.
 488    // make sure we are aware of this.
 489  197 Option o = null;
 490  197 if (origArgs[origArgs.length - 1] instanceof DataVersion)
 491    {
 492  196 o = new Option();
 493  196 o.setDataVersion((DataVersion) origArgs[origArgs.length - 1]);
 494    }
 495    // modify the call to the non-dataversioned counterpart since we've popped out the data version
 496  197 Object[] args = new Object[origArgs.length - 1];
 497  197 System.arraycopy(origArgs, 0, args, 0, args.length);
 498   
 499  197 ctx.setMethodCall(MethodCallFactory.create(MethodDeclarations.getUnversionedMethod(modification.getMethodId()), args));
 500  196 if (o != null) ctx.setOptionOverrides(o);
 501    }
 502    else
 503    {
 504  3910 ctx.setMethodCall(modification);
 505    }
 506   
 507  4107 retval = super.invoke(ctx);
 508   
 509  4085 if (!isActive(ctx.getTransaction()))
 510    {
 511  0 throw new ReplicationException("prepare() failed -- " + "local transaction status is not STATUS_ACTIVE; is " + ctx.getTransaction().getStatus());
 512    }
 513    }
 514    catch (Throwable t)
 515    {
 516  22 log.error("method invocation failed", t);
 517  22 retval = t;
 518    }
 519    finally
 520    {
 521    // reset any options
 522  204 if (injectDataVersions) ctx.setOptionOverrides(originalOption);
 523  4107 ctx.setMethodCall(originalMethodCall);
 524    }
 525  4107 if (retval != null && retval instanceof Exception)
 526    {
 527  22 if (retval instanceof RuntimeException)
 528  22 throw (RuntimeException) retval;
 529    else
 530  0 throw new RuntimeException((Exception) retval);
 531    }
 532    }
 533    }
 534    // need to pass up the prepare as well and return value from that
 535  727 return retval;
 536    }
 537   
 538    /**
 539    * Handles a commit or a rollback for a remote gtx. Called by invoke().
 540    *
 541    * @return
 542    * @throws Throwable
 543    */
 544  724 private Object handleRemoteCommitRollback(InvocationContext ctx) throws Throwable
 545    {
 546  724 Transaction ltx;
 547  724 GlobalTransaction gtx = ctx.getGlobalTransaction();
 548  724 MethodCall m = ctx.getMethodCall();
 549   
 550  724 try
 551    {
 552  724 ltx = getLocalTxForGlobalTx(gtx);
 553    }
 554    catch (IllegalStateException e)
 555    {
 556  4 if (m.getMethodId() == MethodDeclarations.rollbackMethod_id)
 557    {
 558  2 log.warn("No local transaction for this remotely originating rollback. Possibly rolling back before a prepare call was broadcast?");
 559  2 return null;
 560    }
 561    else
 562    {
 563  2 throw e;
 564    }
 565    }
 566   
 567    // disconnect if we have a current tx associated
 568  720 Transaction currentTx = txManager.getTransaction();
 569  720 boolean resumeCurrentTxOnCompletion = false;
 570  720 try
 571    {
 572  720 if (!ltx.equals(currentTx))
 573    {
 574  720 currentTx = txManager.suspend();
 575  720 resumeCurrentTxOnCompletion = true;
 576  720 txManager.resume(ltx);
 577    // make sure we set this in the ctx
 578  720 ctx.setTransaction(ltx);
 579    }
 580  0 if (log.isDebugEnabled()) log.debug(" executing " + m + "() with local TX " + ltx + " under global tx " + gtx);
 581   
 582    // pass commit up the chain
 583    // super.invoke(ctx);
 584    // commit or rollback the tx.
 585  720 if (m.getMethodId() == MethodDeclarations.commitMethod_id)
 586    {
 587  694 txManager.commit();
 588  693 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
 589    {
 590  689 m_commits++;
 591    }
 592    }
 593    else
 594    {
 595  26 txManager.rollback();
 596  26 if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
 597    {
 598  22 m_rollbacks++;
 599    }
 600    }
 601    }
 602    finally
 603    {
 604    //resume the old transaction if we suspended it
 605  720 if (resumeCurrentTxOnCompletion)
 606    {
 607  0 if (log.isTraceEnabled()) log.trace("Resuming suspended transaction " + currentTx);
 608  720 txManager.suspend();
 609  720 if (currentTx != null)
 610    {
 611  2 txManager.resume(currentTx);
 612  2 ctx.setTransaction(currentTx);
 613    }
 614    }
 615   
 616    // remove from local lists.
 617  720 remoteTransactions.remove(gtx);
 618  720 transactions.remove(ltx);
 619   
 620    // this tx has completed. Clean up in the tx table.
 621  720 txTable.remove(gtx);
 622  720 txTable.remove(ltx);
 623    }
 624   
 625  0 if (log.isDebugEnabled()) log.debug("Finished remote commit/rollback method for " + gtx);
 626   
 627  719 return null;
 628    }
 629   
 630  724 private Transaction getLocalTxForGlobalTx(GlobalTransaction gtx) throws IllegalStateException
 631    {
 632  724 Transaction ltx = txTable.getLocalTransaction(gtx);
 633  724 if (ltx != null)
 634    {
 635  0 if (log.isDebugEnabled()) log.debug("Found local TX=" + ltx + ", global TX=" + gtx);
 636    }
 637    else
 638    {
 639  4 throw new IllegalStateException(" found no local TX for global TX " + gtx);
 640    }
 641  720 return ltx;
 642    }
 643   
 644    /**
 645    * Handles a commit or a rollback. Called by the synch handler. Simply tests that we are in the correct tx and
 646    * passes the meth call up the interceptor chain.
 647    *
 648    * @return
 649    * @throws Throwable
 650    */
 651  1121770 private Object handleCommitRollback(InvocationContext ctx) throws Throwable
 652    {
 653    //GlobalTransaction gtx = findGlobalTransaction(m.getArgs());
 654  1121770 GlobalTransaction gtx = ctx.getGlobalTransaction();
 655  1121770 Object result;
 656   
 657    // this must have a local transaction associated if a prepare has been
 658    // callled before
 659    //Transaction ltx = getLocalTxForGlobalTx(gtx);
 660   
 661    // Transaction currentTx = txManager.getTransaction();
 662   
 663    //if (!ltx.equals(currentTx)) throw new IllegalStateException(" local transaction " + ltx + " transaction does not match running tx " + currentTx);
 664   
 665  1121770 result = super.invoke(ctx);
 666   
 667  0 if (log.isDebugEnabled()) log.debug("Finished local commit/rollback method for " + gtx);
 668  1121759 return result;
 669    }
 670   
 671    // --------------------------------------------------------------
 672    // Transaction phase runners
 673    // --------------------------------------------------------------
 674   
 675    /**
 676    * creates a commit() MethodCall and feeds it to handleCommitRollback();
 677    *
 678    * @param gtx
 679    */
 680  1121442 protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx, List modifications, boolean onePhaseCommit)
 681    {
 682    // set the hasMods flag in the invocation ctx. This should not be replicated, just used locally by the interceptors.
 683  1121440 ctx.setTxHasMods(modifications != null && modifications.size() > 0);
 684  1121442 try
 685    {
 686  1121442 MethodCall commitMethod;
 687  1121442 if (onePhaseCommit)
 688    {
 689    // running a 1-phase commit.
 690  59 if (configuration.isNodeLockingOptimistic())
 691    {
 692  0 commitMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod,
 693    gtx, modifications, null, cache.getLocalAddress(), true);
 694    }
 695    else
 696    {
 697  59 commitMethod = MethodCallFactory.create(MethodDeclarations.prepareMethod,
 698    gtx, modifications, cache.getLocalAddress(),
 699    true);
 700    }
 701    }
 702    else
 703    {
 704  1121383 commitMethod = MethodCallFactory.create(MethodDeclarations.commitMethod, gtx);
 705    }
 706   
 707  1121442 if (log.isTraceEnabled())
 708    {
 709  0 log.trace(" running commit for " + gtx);
 710    }
 711  1121442 ctx.setMethodCall(commitMethod);
 712  1121442 handleCommitRollback(ctx);
 713    }
 714    catch (Throwable e)
 715    {
 716  6 log.warn("Commit failed. Clearing stale locks.");
 717  6 try
 718    {
 719  6 cleanupStaleLocks(gtx);
 720    }
 721    catch (RuntimeException re)
 722    {
 723  0 log.error("Unable to clear stale locks", re);
 724  0 throw re;
 725    }
 726    catch (Throwable e2)
 727    {
 728  0 log.error("Unable to clear stale locks", e2);
 729  0 throw new RuntimeException(e2);
 730    }
 731  6 if (e instanceof RuntimeException)
 732  4 throw (RuntimeException) e;
 733    else
 734  2 throw new RuntimeException("Commit failed.", e);
 735    }
 736    }
 737   
 738   
 739  6 private void cleanupStaleLocks(GlobalTransaction gtx) throws Throwable
 740    {
 741  6 TransactionEntry entry = txTable.get(gtx);
 742  6 if (entry != null)
 743    {
 744  6 entry.releaseAllLocksLIFO(gtx);
 745    }
 746    }
 747   
 748    /**
 749    * creates a rollback() MethodCall and feeds it to handleCommitRollback();
 750    *
 751    * @param gtx
 752    */
 753  328 protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx, List modifications)
 754    {
 755    //Transaction ltx = null;
 756  328 try
 757    {
 758  328 ctx.setTxHasMods(modifications != null && modifications.size() > 0);
 759    // JBCACHE-457
 760    // MethodCall rollbackMethod = MethodCall(CacheImpl.rollbackMethod, new Object[]{gtx, hasMods ? true : false});
 761  328 MethodCall rollbackMethod = MethodCallFactory.create(MethodDeclarations.rollbackMethod, gtx);
 762  328 if (log.isTraceEnabled())
 763    {
 764  0 log.trace(" running rollback for " + gtx);
 765    }
 766   
 767    //JBCACHE-359 Store a lookup for the gtx so a listener
 768    // callback can find it
 769    //ltx = getLocalTxForGlobalTx(gtx);
 770  328 rollbackTransactions.put(tx, gtx);
 771   
 772  328 ctx.setMethodCall(rollbackMethod);
 773  328 handleCommitRollback(ctx);
 774    }
 775    catch (Throwable e)
 776    {
 777  5 log.warn("Rollback had a problem", e);
 778    }
 779    finally
 780    {
 781  328 if (tx != null) rollbackTransactions.remove(tx);
 782    }
 783    }
 784   
 785    /**
 786    * Handles a local prepare - invoked by the sync handler. Tests if the current tx matches the gtx passed in to the
 787    * method call and passes the prepare() call up the chain.
 788    *
 789    * @return
 790    * @throws Throwable
 791    */
 792  67441 protected Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List modifications) throws Throwable
 793    {
 794    // build the method call
 795  67441 MethodCall prepareMethod;
 796    // if (cache.getCacheModeInternal() != CacheImpl.REPL_ASYNC)
 797    // {
 798    // running a 2-phase commit.
 799  67441 if (configuration.isNodeLockingOptimistic())
 800    {
 801  980 prepareMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod, gtx, modifications, null, cache.getLocalAddress(), false);
 802    }
 803  66461 else if (configuration.getCacheMode() != Configuration.CacheMode.REPL_ASYNC)
 804    {
 805  66418 prepareMethod = MethodCallFactory.create(MethodDeclarations.prepareMethod,
 806    gtx, modifications, cache.getLocalAddress(),
 807    false);// don't commit or rollback - wait for call
 808    }
 809    //}
 810    else
 811    {
 812    // this is a REPL_ASYNC call - do 1-phase commit. break!
 813  43 log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
 814  43 return null;
 815    }
 816   
 817    // passes a prepare call up the local interceptor chain. The replication interceptor
 818    // will do the broadcasting if needed. This is so all requests (local/remote) are
 819    // treated the same
 820  67398 Object result;
 821   
 822    // Is there a local transaction associated with GTX ?
 823  67398 Transaction ltx = ctx.getTransaction();
 824   
 825    //if ltx is not null and it is already running
 826  67398 if (txManager.getTransaction() != null && ltx != null && txManager.getTransaction().equals(ltx))
 827    {
 828  67398 ctx.setMethodCall(prepareMethod);
 829  67398 result = super.invoke(ctx);
 830    }
 831    else
 832    {
 833  0 log.warn("Local transaction does not exist or does not match expected transaction " + gtx);
 834  0 throw new CacheException(" local transaction " + ltx + " does not exist or does not match expected transaction " + gtx);
 835    }
 836  67336 return result;
 837    }
 838   
 839    // --------------------------------------------------------------
 840    // Private helper methods
 841    // --------------------------------------------------------------
 842   
 843   
 844    /**
 845    * Creates a gtx (if one doesnt exist), a sync handler, and registers the tx.
 846    *
 847    * @param tx
 848    * @return
 849    * @throws Exception
 850    */
 851  1401906 private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx) throws Exception
 852    {
 853  1401906 GlobalTransaction gtx;
 854  1401906 if (isValid(tx) && transactions.put(tx, NULL) == null)
 855    {
 856  1121124 gtx = cache.getCurrentTransaction(tx, true);
 857  1121124 if (gtx.isRemote())
 858    {
 859    // should be no need to register a handler since this a remotely initiated gtx
 860  46 if (log.isTraceEnabled())
 861    {
 862  0 log.trace("is a remotely initiated gtx so no need to register a tx for it");
 863    }
 864    }
 865    else
 866    {
 867  1121078 if (log.isTraceEnabled())
 868    {
 869  0 log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
 870    }
 871    // see the comment in the LocalSyncHandler for the last isOriginLocal param.
 872  1121078 LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, cache, !ctx.isOriginLocal());
 873  1121078 registerHandler(tx, myHandler, ctx);
 874    }
 875    }
 876  ? else if ((gtx = (GlobalTransaction) rollbackTransactions.get(tx)) != null)
 877    {
 878  0 if (log.isDebugEnabled()) log.debug("Transaction " + tx + " is already registered and is rolling back.");
 879    }
 880    else
 881    {
 882  0 if (log.isDebugEnabled()) log.debug("Transaction " + tx + " is already registered.");
 883   
 884    }
 885  1401906 return gtx;
 886    }
 887   
 888    /**
 889    * Registers a sync hander against a tx.
 890    *
 891    * @param tx
 892    * @param handler
 893    * @throws Exception
 894    */
 895  1121827 private void registerHandler(Transaction tx, Synchronization handler, InvocationContext ctx) throws Exception
 896    {
 897  1121825 OrderedSynchronizationHandler orderedHandler = OrderedSynchronizationHandler.getInstance(tx);
 898   
 899  0 if (log.isTraceEnabled()) log.trace("registering for TX completion: SynchronizationHandler(" + handler + ")");
 900   
 901  1121827 orderedHandler.registerAtHead(handler);// needs to be invoked first on TX commit
 902   
 903  1121827 cache.getNotifier().notifyTransactionRegistered(tx, ctx);
 904    }
 905   
 906    /**
 907    * Replaces the global transaction in a method call with a new global transaction passed in.
 908    */
 909  1121124 private MethodCall replaceGtx(MethodCall m, GlobalTransaction gtx)
 910    {
 911  1121124 Class[] argClasses = m.getMethod().getParameterTypes();
 912  1121124 Object[] args = m.getArgs();
 913   
 914  1121123 for (int i = 0; i < argClasses.length; i++)
 915    {
 916  3224550 if (argClasses[i].equals(GlobalTransaction.class))
 917    {
 918  65411 if (!gtx.equals(args[i]))
 919    {
 920  732 args[i] = gtx;
 921  732 m.setArgs(args);
 922    }
 923  65411 break;
 924    }
 925    }
 926  1121124 return m;
 927    }
 928   
 929    /**
 930    * Creates and starts a local tx
 931    *
 932    * @return
 933    * @throws Exception
 934    */
 935  4046 private Transaction createLocalTx() throws Exception
 936    {
 937  4046 if (log.isTraceEnabled())
 938    {
 939  0 log.trace("Creating transaction for thread " + Thread.currentThread());
 940    }
 941  4046 Transaction localTx;
 942  0 if (txManager == null) throw new Exception("Failed to create local transaction; TransactionManager is null");
 943  4046 txManager.begin();
 944  4046 localTx = txManager.getTransaction();
 945  4046 return localTx;
 946    }
 947   
 948    /**
 949    * Creates a new local transaction for a given global transaction.
 950    *
 951    * @param gtx
 952    * @return
 953    * @throws Exception
 954    */
 955  749 private Transaction createLocalTxForGlobalTx(GlobalTransaction gtx, InvocationContext ctx) throws Exception
 956    {
 957  749 Transaction localTx = createLocalTx();
 958  749 txTable.put(localTx, gtx);
 959    // attach this to the context
 960  749 ctx.setTransaction(localTx);
 961  0 if (log.isTraceEnabled()) log.trace("Created new tx for gtx " + gtx);
 962  749 return localTx;
 963    }
 964   
 965    // ------------------------------------------------------------------------
 966    // Synchronization classes
 967    // ------------------------------------------------------------------------
 968   
 969    // this controls the whole transaction
 970   
 971    private class RemoteSynchronizationHandler implements Synchronization
 972    {
 973    Transaction tx = null;
 974    GlobalTransaction gtx = null;
 975    CacheSPI cache = null;
 976    List modifications = null;
 977    TransactionEntry entry = null;
 978    protected InvocationContext ctx; // the context for this call.
 979   
 980   
 981  1121827 RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache)
 982    {
 983  1121827 this.gtx = gtx;
 984  1121827 this.tx = tx;
 985  1121827 this.cache = cache;
 986    }
 987   
 988  1121518 public void beforeCompletion()
 989    {
 990  0 if (log.isTraceEnabled()) log.trace("Running beforeCompletion on gtx " + gtx);
 991  1121518 entry = txTable.get(gtx);
 992  1121518 if (entry == null)
 993    {
 994  0 log.error("Transaction has a null transaction entry - beforeCompletion() will fail.");
 995  0 log.error("TxTable contents: " + txTable);
 996  0 throw new IllegalStateException("cannot find transaction entry for " + gtx);
 997    }
 998   
 999  1121518 modifications = entry.getModifications();
 1000  1121518 ctx = cache.getInvocationContext();
 1001  1121518 ctx.setOriginLocal(false);
 1002    }
 1003   
 1004    // this should really not be done here -
 1005    // it is supposed to be post commit not actually run the commit
 1006  1121770 public void afterCompletion(int status)
 1007    {
 1008  1121770 try
 1009    {
 1010    // could happen if a rollback is called and beforeCompletion() doesn't get called.
 1011  27 if (ctx == null) ctx = cache.getInvocationContext();
 1012  1121770 setTransactionalContext(tx, gtx, ctx);
 1013   
 1014  1121770 try
 1015    {
 1016  0 if (txManager.getTransaction() != null && !txManager.getTransaction().equals(tx)) txManager.resume(tx);
 1017    }
 1018    catch (Exception e)
 1019    {
 1020  0 log.error("afterCompletion error: " + status, e);
 1021    }
 1022   
 1023   
 1024  0 if (log.isTraceEnabled()) log.trace("calling aftercompletion for " + gtx);
 1025    // set any transaction wide options as current for this thread.
 1026  ? if ((entry = txTable.get(gtx)) != null)
 1027    {
 1028  1121770 modifications = entry.getModifications();
 1029  1121770 ctx.setOptionOverrides(entry.getOption());
 1030    }
 1031  1121770 transactions.remove(tx);
 1032   
 1033  1121770 switch (status)
 1034    {
 1035  1121442 case Status.STATUS_COMMITTED:
 1036   
 1037    // if this is optimistic or sync repl
 1038  1121442 boolean onePhaseCommit = !configuration.isNodeLockingOptimistic() && configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
 1039  0 if (log.isDebugEnabled()) log.debug("Running commit phase. One phase? " + onePhaseCommit);
 1040  1121442 runCommitPhase(ctx, gtx, tx, modifications, onePhaseCommit);
 1041  1121436 log.debug("Finished commit phase");
 1042  1121436 break;
 1043   
 1044  71 case Status.STATUS_MARKED_ROLLBACK:
 1045  257 case Status.STATUS_ROLLEDBACK:
 1046  328 log.debug("Running rollback phase");
 1047  328 runRollbackPhase(ctx, gtx, tx, modifications);
 1048  328 log.debug("Finished rollback phase");
 1049  328 break;
 1050   
 1051  0 default:
 1052  0 throw new IllegalStateException("illegal status: " + status);
 1053    }
 1054    }
 1055    finally
 1056    {
 1057    // clean up the tx table
 1058  1121770 txTable.remove(gtx);
 1059  1121770 txTable.remove(tx);
 1060  1121770 setTransactionalContext(null, null, ctx);
 1061  1121770 cleanupInternalState();
 1062    }
 1063    }
 1064   
 1065    /**
 1066    * Cleans out (nullifies) member variables held by the sync object for easier gc. Could be (falsely) seen as a mem
 1067    * leak if the TM implementation hangs on to the synchronizations for an unnecessarily long time even after the tx
 1068    * completes. See JBCACHE-1007.
 1069    */
 1070  1121770 private void cleanupInternalState()
 1071    {
 1072  1121770 this.tx = null;
 1073  1121769 this.gtx = null;
 1074  1121770 this.cache = null;
 1075  1121770 this.modifications = null;
 1076  1121770 this.entry = null;
 1077    }
 1078   
 1079  1 public String toString()
 1080    {
 1081  1 return "TxInterceptor.RemoteSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
 1082    }
 1083   
 1084  77 protected String getTxAsString()
 1085    {
 1086    // JBCACHE-1114 -- don't call toString() on tx or it can lead to stack overflow
 1087  77 if (tx == null)
 1088  6 return null;
 1089   
 1090  71 return tx.getClass().getName() + "@" + System.identityHashCode(tx);
 1091    }
 1092    }
 1093   
 1094    private class LocalSynchronizationHandler extends RemoteSynchronizationHandler
 1095    {
 1096    private boolean localRollbackOnly = true;
 1097    // a VERY strange situation where a tx has remote origins, but since certain buddy group org methods perform local
 1098    // cleanups even when remotely triggered, and optimistic locking is used, you end up with an implicit local tx.
 1099    // This is STILL remotely originating though and this needs to be made explicit here.
 1100    // this can be checked by inspecting the InvocationContext.isOriginLocal() at the time of registering the sync.
 1101    private boolean remoteLocal = false;
 1102   
 1103    /**
 1104    * A Synchronization for locally originating txs.
 1105    * <p/>
 1106    * a VERY strange situation where a tx has remote origins, but since certain buddy group org methods perform local
 1107    * cleanups even when remotely triggered, and optimistic locking is used, you end up with an implicit local tx.
 1108    * This is STILL remotely originating though and this needs to be made explicit here.
 1109    * this can be checked by inspecting the InvocationContext.isOriginLocal() at the time of registering the sync.
 1110    *
 1111    * @param gtx
 1112    * @param tx
 1113    * @param cache
 1114    * @param remoteLocal
 1115    */
 1116  1121078 LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache, boolean remoteLocal)
 1117    {
 1118  1121078 super(gtx, tx, cache);
 1119  1121078 this.remoteLocal = remoteLocal;
 1120    }
 1121   
 1122  1120805 @Override
 1123    public void beforeCompletion()
 1124    {
 1125  1120805 super.beforeCompletion();
 1126  1120805 ctx.setOriginLocal(!remoteLocal); // this is the LOCAL sync handler after all!
 1127    // fetch the modifications before the transaction is committed
 1128    // (and thus removed from the txTable)
 1129  1120805 setTransactionalContext(tx, gtx, ctx);
 1130  1120805 if (modifications.size() == 0)
 1131    {
 1132  0 if (log.isTraceEnabled()) log.trace("No modifications in this tx. Skipping beforeCompletion()");
 1133  1053364 return;
 1134    }
 1135   
 1136    // set any transaction wide options as current for this thread.
 1137  67441 ctx.setOptionOverrides(entry.getOption());
 1138   
 1139  67441 try
 1140    {
 1141  67441 switch (tx.getStatus())
 1142    {
 1143    // if we are active or preparing then we can go ahead
 1144  0 case Status.STATUS_ACTIVE:
 1145  67441 case Status.STATUS_PREPARING:
 1146    // run a prepare call.
 1147  67441 Object result = runPreparePhase(ctx, gtx, modifications);
 1148   
 1149  67379 if (result instanceof Throwable)
 1150    {
 1151  0 tx.setRollbackOnly();
 1152  0 throw (Throwable) result;
 1153    }
 1154  67379 break;
 1155  0 default:
 1156  0 throw new CacheException("transaction " + tx + " in status " + tx.getStatus() + " unbale to start transaction");
 1157    }
 1158    }
 1159    catch (Throwable t)
 1160    {
 1161  62 try
 1162    {
 1163  62 tx.setRollbackOnly();
 1164    }
 1165    catch (SystemException se)
 1166    {
 1167  0 throw new RuntimeException("setting tx rollback failed ", se);
 1168    }
 1169  62 if (t instanceof RuntimeException)
 1170  61 throw (RuntimeException) t;
 1171    else
 1172  1 throw new RuntimeException("", t);
 1173    }
 1174    finally
 1175    {
 1176  67441 localRollbackOnly = false;
 1177  67441 setTransactionalContext(null, null, ctx);
 1178    }
 1179    }
 1180   
 1181  1121030 @Override
 1182    public void afterCompletion(int status)
 1183    {
 1184    // could happen if a rollback is called and beforeCompletion() doesn't get called.
 1185  236 if (ctx == null) ctx = cache.getInvocationContext();
 1186  1121030 ctx.setLocalRollbackOnly(localRollbackOnly);
 1187  1121030 super.afterCompletion(status);
 1188    }
 1189   
 1190  76 public String toString()
 1191    {
 1192  76 return "TxInterceptor.LocalSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
 1193    }
 1194    }
 1195    }