Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 1,417   Methods: 50
NCLOC: 969   Classes: 6
 
 Source file Conditionals Statements Methods TOTAL
SyncReplTxTest.java 45.7% 85.2% 84% 82.1%
coverage coverage
 1    /*
 2    *
 3    * JBoss, the OpenSource J2EE webOS
 4    *
 5    * Distributable under LGPL license.
 6    * See terms of license at gnu.org.
 7    */
 8    package org.jboss.cache.replicated;
 9   
 10    import junit.framework.Test;
 11    import junit.framework.TestCase;
 12    import junit.framework.TestSuite;
 13    import org.apache.commons.logging.Log;
 14    import org.apache.commons.logging.LogFactory;
 15    import org.jboss.cache.Cache;
 16    import org.jboss.cache.CacheException;
 17    import org.jboss.cache.CacheImpl;
 18    import org.jboss.cache.DefaultCacheFactory;
 19    import org.jboss.cache.Fqn;
 20    import org.jboss.cache.config.Configuration;
 21    import org.jboss.cache.lock.IsolationLevel;
 22    import org.jboss.cache.lock.TimeoutException;
 23    import org.jboss.cache.misc.TestingUtil;
 24    import org.jboss.cache.notifications.annotation.CacheListener;
 25    import org.jboss.cache.notifications.annotation.NodeModified;
 26    import org.jboss.cache.notifications.event.NodeEvent;
 27    import org.jboss.cache.transaction.DummyTransactionManager;
 28   
 29    import javax.naming.Context;
 30    import javax.transaction.NotSupportedException;
 31    import javax.transaction.RollbackException;
 32    import javax.transaction.Status;
 33    import javax.transaction.Synchronization;
 34    import javax.transaction.SystemException;
 35    import javax.transaction.Transaction;
 36    import javax.transaction.TransactionManager;
 37    import java.util.ArrayList;
 38    import java.util.List;
 39    import java.util.concurrent.Semaphore;
 40   
 41    /**
 42    * Replicated unit test for sync transactional CacheImpl
 43    * Note: we use DummyTransactionManager for Tx purpose instead of relying on
 44    * jta.
 45    *
 46    * @version $Revision: 1.23 $
 47    */
 48    public class SyncReplTxTest extends TestCase
 49    {
 50    private static Log log = LogFactory.getLog(SyncReplTxTest.class);
 51    private CacheImpl cache1;
 52    private CacheImpl cache2;
 53   
 54    private String old_factory = null;
 55    private final String FACTORY = "org.jboss.cache.transaction.DummyContextFactory";
 56    private Semaphore lock = new Semaphore(1);
 57    private Throwable t1_ex;
 58    private Throwable t2_ex;
 59   
 60   
 61  34 public SyncReplTxTest(String name)
 62    {
 63  34 super(name);
 64    }
 65   
 66  34 public void setUp() throws Exception
 67    {
 68  34 super.setUp();
 69  34 old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
 70  34 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
 71  34 t1_ex = t2_ex = null;
 72    }
 73   
 74  34 public void tearDown() throws Exception
 75    {
 76  34 super.tearDown();
 77  34 DummyTransactionManager.destroy();
 78  34 destroyCaches();
 79  34 if (old_factory != null)
 80    {
 81  32 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
 82  32 old_factory = null;
 83    }
 84    }
 85   
 86  48 private Transaction beginTransaction() throws SystemException, NotSupportedException
 87    {
 88  48 DummyTransactionManager mgr = DummyTransactionManager.getInstance();
 89  48 mgr.begin();
 90  48 return mgr.getTransaction();
 91    }
 92   
 93  30 private void initCaches(Configuration.CacheMode caching_mode) throws Exception
 94    {
 95  30 cache1 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 96  30 cache2 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 97  30 cache1.getConfiguration().setCacheMode(caching_mode);
 98  30 cache2.getConfiguration().setCacheMode(caching_mode);
 99  30 cache1.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
 100  30 cache2.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
 101   
 102  30 cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
 103  30 cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
 104  30 cache1.getConfiguration().setLockAcquisitionTimeout(5000);
 105  30 cache2.getConfiguration().setLockAcquisitionTimeout(5000);
 106   
 107  30 configureMultiplexer(cache1);
 108  30 configureMultiplexer(cache2);
 109   
 110  30 cache1.start();
 111  30 cache2.start();
 112   
 113  30 validateMultiplexer(cache1);
 114  30 validateMultiplexer(cache2);
 115    }
 116   
 117    /**
 118    * Provides a hook for multiplexer integration. This default implementation
 119    * is a no-op; subclasses that test mux integration would override
 120    * to integrate the given cache with a multiplexer.
 121    * <p/>
 122    * param cache a cache that has been configured but not yet created.
 123    */
 124  60 protected void configureMultiplexer(Cache cache) throws Exception
 125    {
 126    // default does nothing
 127    }
 128   
 129    /**
 130    * Provides a hook to check that the cache's channel came from the
 131    * multiplexer, or not, as expected. This default impl asserts that
 132    * the channel did not come from the multiplexer.
 133    *
 134    * @param cache a cache that has already been started
 135    */
 136  60 protected void validateMultiplexer(Cache cache)
 137    {
 138  60 assertFalse("Cache is not using multiplexer", cache.getConfiguration().isUsingMultiplexer());
 139    }
 140   
 141  34 private void destroyCaches()
 142    {
 143  34 if (cache1 != null)
 144    {
 145  30 cache1.stop();
 146    }
 147  34 if (cache2 != null)
 148    {
 149  30 cache2.stop();
 150    }
 151  34 cache1 = null;
 152  34 cache2 = null;
 153    }
 154   
 155  2 public void testLockRemoval() throws Exception
 156    {
 157  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 158  2 cache1.getConfiguration().setSyncCommitPhase(true);
 159  2 cache1.releaseAllLocks("/");
 160  2 Transaction tx = beginTransaction();
 161  2 cache1.put("/bela/ban", "name", "Bela Ban");
 162  2 assertEquals(3, cache1.getNumberOfLocksHeld());
 163  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 164  2 tx.commit();
 165  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 166  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 167    }
 168   
 169   
 170  2 public void testSyncRepl() throws Exception
 171    {
 172  2 Integer age;
 173  2 Transaction tx;
 174   
 175  2 try
 176    {
 177  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 178  2 cache1.getConfiguration().setSyncCommitPhase(true);
 179  2 cache2.getConfiguration().setSyncCommitPhase(true);
 180   
 181    // assertEquals(2, cache1.getMembers().size());
 182   
 183  2 tx = beginTransaction();
 184  2 cache1.put("/a/b/c", "age", 38);
 185  2 TransactionManager mgr = cache1.getTransactionManager();
 186  2 tx = mgr.suspend();
 187  2 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
 188  2 log.debug("cache1: locks held before commit: " + cache1.printLockInfo());
 189  2 log.debug("cache2: locks held before commit: " + cache2.printLockInfo());
 190  2 mgr.resume(tx);
 191  2 tx.commit();
 192  2 log.debug("cache1: locks held after commit: " + cache1.printLockInfo());
 193  2 log.debug("cache2: locks held after commit: " + cache2.printLockInfo());
 194   
 195    // value on cache2 must be 38
 196  2 age = (Integer) cache2.get("/a/b/c", "age");
 197  2 assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
 198  2 assertTrue("\"age\" must be 38", age == 38);
 199    }
 200    catch (Exception e)
 201    {
 202  0 fail(e.toString());
 203    }
 204    }
 205   
 206    /**
 207    * @throws Exception
 208    */
 209  2 public void testSimplePut() throws Exception
 210    {
 211  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 212   
 213  2 cache1.put("/JSESSION/localhost/192.168.1.10:32882/Courses/0", "Instructor", "Ben Wang");
 214   
 215  2 cache1.put("/JSESSION/localhost/192.168.1.10:32882/1", "Number", 10);
 216    }
 217   
 218   
 219  2 public void testSimpleTxPut() throws Exception
 220    {
 221  2 Transaction tx;
 222  2 final Fqn NODE1 = Fqn.fromString("/one/two/three");
 223  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 224   
 225  2 tx = beginTransaction();
 226  2 cache1.put(NODE1, "age", 38);
 227  2 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 228  2 tx.commit();
 229   
 230    /*
 231    tx=beginTransaction();
 232    cache1.put(NODE1, "age", new Integer(38));
 233    cache1.put(NODE2, "name", "Ben of The Far East");
 234    cache1.put(NODE3, "key", "UnknowKey");
 235    System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 236   
 237    tx.commit();
 238    */
 239   
 240    /*
 241    tx=beginTransaction();
 242    cache1.put(NODE1, "age", new Integer(38));
 243    cache1.put(NODE1, "AOPInstance", new AOPInstance());
 244    cache1.put(NODE2, "AOPInstance", new AOPInstance());
 245    cache1.put(NODE1, "AOPInstance", new AOPInstance());
 246    tx.commit();
 247    System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 248    */
 249    }
 250   
 251  2 public void testSyncReplWithModficationsOnBothCaches() throws Exception
 252    {
 253  2 Integer age;
 254  2 Transaction tx;
 255  2 final Fqn NODE1 = Fqn.fromString("/one/two/three");
 256  2 final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
 257   
 258  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 259   
 260    // create roots first
 261  2 cache1.put("/one/two", null);
 262  2 cache2.put("/eins/zwei", null);
 263   
 264  2 cache1.getConfiguration().setSyncCommitPhase(true);
 265  2 cache2.getConfiguration().setSyncCommitPhase(true);
 266   
 267  2 tx = beginTransaction();
 268  2 cache1.put(NODE1, "age", 38);
 269  2 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 270   
 271  2 cache2.put(NODE2, "age", 39);
 272  2 System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
 273   
 274  2 System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
 275  2 System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
 276   
 277  2 try
 278    {
 279  2 tx.commit();
 280  0 fail("Should not succeed with SERIALIZABLE semantics");
 281    }
 282    catch (Exception e)
 283    {
 284    //should be a classic deadlock here.
 285    }
 286   
 287  2 System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
 288  2 System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
 289   
 290    /*
 291    assertTrue(cache1.exists(NODE1));
 292    assertTrue(cache1.exists(NODE2));
 293    assertTrue(cache1.exists(NODE1));
 294    assertTrue(cache2.exists(NODE2));
 295   
 296    age = (Integer) cache1.get(NODE1, "age");
 297    assertNotNull("\"age\" obtained from cache1 for " + NODE1 + " must be non-null ", age);
 298    assertTrue("\"age\" must be 38", age == 38);
 299   
 300    age = (Integer) cache2.get(NODE1, "age");
 301    assertNotNull("\"age\" obtained from cache2 for " + NODE1 + " must be non-null ", age);
 302    assertTrue("\"age\" must be 38", age == 38);
 303   
 304    age = (Integer) cache1.get(NODE2, "age");
 305    assertNotNull("\"age\" obtained from cache1 for " + NODE2 + " must be non-null ", age);
 306    assertTrue("\"age\" must be 39", age == 39);
 307   
 308    age = (Integer) cache2.get(NODE2, "age");
 309    assertNotNull("\"age\" obtained from cache2 for " + NODE2 + " must be non-null ", age);
 310    assertTrue("\"age\" must be 39", age == 39);
 311    */
 312   
 313  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 314  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 315  2 System.out.println("TransactionTable for cache1:\n" + cache1.getTransactionTable().toString(true));
 316  2 System.out.println("TransactionTable for cache2:\n" + cache2.getTransactionTable().toString(true));
 317    }
 318   
 319  2 public void testSyncReplWithModficationsOnBothCachesSameData() throws Exception
 320    {
 321  2 Transaction tx;
 322  2 final Fqn NODE = Fqn.fromString("/one/two/three");
 323  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 324  2 tx = beginTransaction();
 325  2 cache1.put(NODE, "age", 38);
 326  2 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 327   
 328  2 cache2.put(NODE, "age", 39);
 329  2 System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
 330   
 331  2 System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
 332  2 System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
 333   
 334  2 try
 335    {
 336  2 tx.commit();
 337  0 fail("commit should throw a RollbackException, we should not get here");
 338    }
 339    catch (RollbackException rollback)
 340    {
 341  2 System.out.println("Transaction was rolled back, this is correct");
 342    }
 343   
 344  2 System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
 345  2 System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
 346   
 347  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 348  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 349   
 350  2 assertEquals(0, cache1.getNumberOfNodes());
 351  2 assertEquals(0, cache2.getNumberOfNodes());
 352    }
 353   
 354   
 355  2 public void testSyncReplWithModficationsOnBothCachesWithRollback() throws Exception
 356    {
 357  2 Transaction tx;
 358  2 final Fqn NODE1 = Fqn.fromString("/one/two/three");
 359  2 final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
 360   
 361  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 362   
 363  2 cache1.getConfiguration().setSyncRollbackPhase(true);
 364  2 cache2.getConfiguration().setSyncRollbackPhase(true);
 365   
 366  2 tx = beginTransaction();
 367  2 cache1.put(NODE1, "age", 38);
 368  2 cache2.put(NODE2, "age", 39);
 369   
 370  2 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
 371  2 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
 372   
 373    // this will rollback the transaction
 374  2 tx.registerSynchronization(new TransactionAborter(tx));
 375   
 376  2 try
 377    {
 378  2 tx.commit();
 379  0 fail("commit should throw a RollbackException, we should not get here");
 380    }
 381    catch (RollbackException rollback)
 382    {
 383  2 System.out.println("Transaction was rolled back, this is correct");
 384    }
 385   
 386  2 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
 387  2 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
 388   
 389  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 390  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 391   
 392  2 assertEquals(0, cache1.getNumberOfNodes());
 393  2 assertEquals(0, cache2.getNumberOfNodes());
 394    }
 395   
 396    /**
 397    * Test for JBCACHE-359 -- does a callback into cache from a listener
 398    * interfere with transaction rollback.
 399    *
 400    * @throws Exception
 401    */
 402   
 403    // Is this test still valid after JBCACHE-1022 ?
 404   
 405    // public void testSyncReplWithRollbackAndListener() throws Exception
 406    // {
 407    // Transaction tx;
 408    // final Fqn NODE1 = Fqn.fromString("/one/two/three");
 409    //
 410    // initCaches(Configuration.CacheMode.REPL_SYNC);
 411    //
 412    // cache1.getConfiguration().setSyncRollbackPhase(true);
 413    // cache2.getConfiguration().setSyncRollbackPhase(true);
 414    //
 415    // // Test with a rollback on the sending side
 416    //
 417    // CallbackListener cbl1 = new CallbackListener(cache1, "age");
 418    // CallbackListener cbl2 = new CallbackListener(cache2, "age");
 419    //
 420    // tx = beginTransaction();
 421    // cache1.put(NODE1, "age", 38);
 422    //
 423    // System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
 424    // System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
 425    //
 426    // // this will rollback the transaction
 427    // tx.registerSynchronization(new TransactionAborter(tx));
 428    //
 429    // try
 430    // {
 431    // tx.commit();
 432    // fail("commit should throw a RollbackException, we should not get here");
 433    // }
 434    // catch (RollbackException rollback)
 435    // {
 436    // rollback.printStackTrace();
 437    // System.out.println("Transaction was rolled back, this is correct");
 438    // }
 439    //
 440    // // Sleep, as the rollback call to cache2 is async
 441    // TestingUtil.sleepThread(1000);
 442    //
 443    // System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
 444    // System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
 445    //
 446    // assertNull(cbl1.getCallbackException());
 447    // assertNull(cbl2.getCallbackException());
 448    //
 449    // assertEquals(0, cache1.getNumberOfLocksHeld());
 450    // assertEquals(0, cache2.getNumberOfLocksHeld());
 451    //
 452    // assertEquals(0, cache1.getNumberOfNodes());
 453    // assertEquals(0, cache2.getNumberOfNodes());
 454    //
 455    // // Test with a rollback on the receiving side
 456    //
 457    // cache2.getNotifier().removeCacheListener(cbl2);
 458    // // listener aborts any active tx
 459    // cbl2 = new TransactionAborterCallbackListener(cache2, "age");
 460    //
 461    // tx = beginTransaction();
 462    // cache1.put(NODE1, "age", 38);
 463    //
 464    // System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
 465    // System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
 466    //
 467    // tx.commit();
 468    //
 469    // // Sleep, as the commit call to cache2 is async
 470    // TestingUtil.sleepThread(1000);
 471    //
 472    // System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
 473    // System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
 474    //
 475    // assertNull(cbl1.getCallbackException());
 476    // assertNull(cbl2.getCallbackException());
 477    //
 478    // assertEquals(0, cache1.getNumberOfLocksHeld());
 479    // assertEquals(0, cache2.getNumberOfLocksHeld());
 480    //
 481    // // cache1 didn't fail, so should have 3 nodes
 482    // assertEquals(3, cache1.getNumberOfNodes());
 483    // assertEquals(0, cache2.getNumberOfNodes());
 484    //
 485    // }
 486   
 487   
 488    /**
 489    * Test for JBCACHE-361 -- does marking a tx on the remote side
 490    * rollback-only cause a rollback on the originating side?
 491    *
 492    * @throws Exception
 493    */
 494  2 public void testSyncReplWithRemoteRollback() throws Exception
 495    {
 496  2 Transaction tx;
 497  2 final Fqn NODE1 = Fqn.fromString("/one/two/three");
 498   
 499  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 500   
 501  2 cache1.getConfiguration().setSyncRollbackPhase(true);
 502  2 cache2.getConfiguration().setSyncRollbackPhase(true);
 503   
 504    // Test with a rollback on the remote side
 505   
 506    // listener aborts any active tx
 507    //TransactionAborterListener tal = new TransactionAborterListener(cache2);
 508   
 509  2 tx = beginTransaction();
 510  2 cache1.put(NODE1, "age", 38);
 511   
 512  2 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
 513  2 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
 514   
 515    // instead of a listener lets just get a WL on ROOT on cache2. And hold on to it.
 516  2 tx = cache1.getTransactionManager().suspend();
 517   
 518  2 cache2.getTransactionManager().begin();
 519  2 cache2.getRoot().put("x", "y");
 520  2 Transaction tx2 = cache2.getTransactionManager().suspend();
 521   
 522  2 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
 523  2 cache1.getTransactionManager().resume(tx);
 524   
 525  2 try
 526    {
 527  2 tx.commit();
 528  0 fail("commit should throw a RollbackException, we should not get here");
 529    }
 530    catch (RollbackException rollback)
 531    {
 532  2 System.out.println("Transaction was rolled back, this is correct");
 533    }
 534    finally
 535    {
 536  2 cache2.getTransactionManager().resume(tx2);
 537  2 tx2.rollback();
 538    }
 539   
 540    // Sleep, as the commit call to cache2 is async
 541  2 TestingUtil.sleepThread(1000);
 542   
 543  2 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
 544  2 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
 545   
 546    //assertNull(tal.getCallbackException());
 547   
 548  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 549  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 550   
 551  2 assertEquals(0, cache1.getNumberOfNodes());
 552  2 assertEquals(0, cache2.getNumberOfNodes());
 553   
 554    }
 555   
 556   
 557  2 public void testASyncRepl() throws Exception
 558    {
 559  2 Integer age;
 560  2 Transaction tx;
 561   
 562  2 initCaches(Configuration.CacheMode.REPL_ASYNC);
 563   
 564  2 tx = beginTransaction();
 565  2 cache1.put("/a/b/c", "age", 38);
 566  2 Thread.sleep(1000);
 567  2 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
 568  2 tx.commit();
 569  2 Thread.sleep(1000);
 570   
 571    // value on cache2 must be 38
 572  2 age = (Integer) cache2.get("/a/b/c", "age");
 573  2 assertNotNull("\"age\" obtained from cache2 is null ", age);
 574  2 assertTrue("\"age\" must be 38", age == 38);
 575   
 576    }
 577   
 578    /**
 579    * Tests concurrent modifications: thread1 succeeds and thread2 is blocked until thread1 is done, and then succeeds
 580    * too. However, this is flawed with the introduction of interceptors, here's why.<br/>
 581    * <ul>
 582    * <li>Thread1 acquires the lock for /bela/ban on cache1
 583    * <li>Thread2 blocks on Thread1 to release the lock
 584    * <li>Thread1 commits: this means the TransactionInterceptor and the ReplicationInterceptor are called in
 585    * the sequence in which they registered. Unfortunately, the TransactionInterceptor registered first. In the
 586    * PREPARE phase, the ReplicationInterceptor calls prepare() in cache2 synchronously. The TxInterceptor
 587    * does nothing. The the COMMIT phase, the TxInterceptor commits the data by releasing the locks locally and
 588    * then the ReplicationInterceptor sends an asynchronous COMMIT to cache2.
 589    * <li>Because the TxInterceptor for Thread1 releases the locks locally <em>before</em> sending the async COMMIT,
 590    * Thread2 is able to acquire the lock for /bela/ban in cache1 and then starts the PREPARE phase by sending a
 591    * synchronous PREPARE to cache2. If this PREPARE arrives at cache2 <em>before</em> the COMMIT from Thread1,
 592    * the PREPARE will block because it attempts to acquire a lock on /bela/ban on cache2 still held by Thread1
 593    * (which would be released by Thread1's COMMIT). This results in deadlock, which is resolved by Thread2 running
 594    * into a timeout with subsequent rollback and Thread1 succeeding.<br/>
 595    * </ul>
 596    * There are 3 solutions to this:
 597    * <ol>
 598    * <li>Do nothing. This is standard behavior for concurrent access to the same data. Same thing if the 2 threads
 599    * operated on the same data in <em>separate</em> caches, e.g. Thread1 on /bela/ban in cache1 and Thread2 on
 600    * /bela/ban in cache2. The semantics of Tx commit as handled by the interceptors is: after tx1.commit() returns
 601    * the locks held by tx1 are release and a COMMIT message is on the way (if sent asynchronously).
 602    * <li>Force an order over TxInterceptor and ReplicationInterceptor. This would require ReplicationInterceptor
 603    * to always be fired first on TX commit. Downside: the interceptors have an implicit dependency, which is not
 604    * nice.
 605    * <li>Priority-order requests at the receiver; e.g. a COMMIT could release a blocked PREPARE. This is bad because
 606    * it violates JGroups' FIFO ordering guarantees.
 607    * </ol>
 608    * I'm currently investigating solution #2, ie. creating an OrderedSynchronizationHandler, which allows other
 609    * SynchronizationHandlers to register (atHead, atTail), and the OrderedSynchronizationHandler would call the
 610    * SynchronizationHandler in the order in which they are defined.
 611    *
 612    * @throws Exception
 613    */
 614  2 public void testConcurrentPuts() throws Exception
 615    {
 616  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 617  2 cache1.getConfiguration().setSyncCommitPhase(true);
 618   
 619  2 Thread t1 = new Thread("Thread1")
 620    {
 621    Transaction tx;
 622   
 623  2 public void run()
 624    {
 625  2 try
 626    {
 627  2 tx = beginTransaction();
 628  2 cache1.put("/bela/ban", "name", "Bela Ban");
 629  2 TestingUtil.sleepThread(2000);// Thread2 will be blocked until we commit
 630  2 tx.commit();
 631  2 System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo());
 632  2 System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo());
 633    }
 634    catch (Throwable ex)
 635    {
 636  0 ex.printStackTrace();
 637  0 t1_ex = ex;
 638    }
 639    }
 640    };
 641   
 642  2 Thread t2 = new Thread("Thread2")
 643    {
 644    Transaction tx;
 645   
 646  2 public void run()
 647    {
 648  2 try
 649    {
 650  2 TestingUtil.sleepThread(1000);// give Thread1 time to acquire the lock
 651  2 tx = beginTransaction();
 652  2 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
 653  2 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
 654  2 cache1.put("/bela/ban", "name", "Michelle Ban");
 655  2 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
 656  2 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
 657  2 tx.commit();
 658  2 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
 659  2 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
 660    }
 661    catch (Throwable ex)
 662    {
 663  0 ex.printStackTrace();
 664  0 t2_ex = ex;
 665    }
 666    }
 667    };
 668   
 669    // Let the game start
 670  2 t1.start();
 671  2 t2.start();
 672   
 673    // Wait for threads to die
 674  2 t1.join();
 675  2 t2.join();
 676   
 677  2 if (t1_ex != null)
 678    {
 679  0 fail("Thread1 failed: " + t1_ex);
 680    }
 681  2 if (t2_ex != null)
 682    {
 683  0 fail("Thread2 failed: " + t2_ex);
 684    }
 685   
 686  2 assertEquals("Michelle Ban", cache1.get("/bela/ban", "name"));
 687    }
 688   
 689   
 690    /**
 691    * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
 692    */
 693  2 public void testConcurrentCommitsWith1Thread() throws Exception
 694    {
 695  2 _testConcurrentCommits(1);
 696    }
 697   
 698    /**
 699    * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
 700    */
 701  2 public void testConcurrentCommitsWith5Threads() throws Exception
 702    {
 703  2 _testConcurrentCommits(5);
 704    }
 705   
 706    /**
 707    * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
 708    */
 709  4 private void _testConcurrentCommits(int num_threads) throws Exception
 710    {
 711  4 Object myMutex = new Object();
 712   
 713  4 final CacheImpl c1 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 714  4 final CacheImpl c2 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 715  4 c1.getConfiguration().setClusterName("TempCluster");
 716  4 c2.getConfiguration().setClusterName("TempCluster");
 717  4 c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
 718  4 c2.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
 719  4 c1.getConfiguration().setSyncCommitPhase(true);
 720  4 c2.getConfiguration().setSyncCommitPhase(true);
 721  4 c1.getConfiguration().setSyncRollbackPhase(true);
 722  4 c2.getConfiguration().setSyncRollbackPhase(true);
 723  4 c1.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
 724  4 c2.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
 725  4 c1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
 726  4 c2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
 727  4 c1.getConfiguration().setLockAcquisitionTimeout(5000);
 728  4 c2.getConfiguration().setLockAcquisitionTimeout(5000);
 729  4 c1.start();
 730  4 c2.start();
 731  4 final List<Exception> exceptions = new ArrayList<Exception>();
 732   
 733    class MyThread extends Thread
 734    {
 735    Object mutex;
 736   
 737  12 public MyThread(String name, Object mutex)
 738    {
 739  12 super(name);
 740  12 this.mutex = mutex;
 741    }
 742   
 743  12 public void run()
 744    {
 745  12 Transaction tx = null;
 746   
 747  12 try
 748    {
 749  12 tx = beginTransaction();
 750  12 c1.put("/thread/" + getName(), null);
 751  12 System.out.println("Thread " + getName() + " after put(): " + c1.toString());
 752  12 System.out.println("Thread " + getName() + " waiting on mutex");
 753  12 synchronized (mutex)
 754    {
 755  12 mutex.wait();
 756    }
 757  12 System.out.println("Thread " + getName() + " committing");
 758  12 tx.commit();
 759  12 System.out.println("Thread " + getName() + " committed successfully");
 760    }
 761    catch (Exception e)
 762    {
 763  0 exceptions.add(e);
 764    }
 765    finally
 766    {
 767  12 try
 768    {
 769  12 if (tx != null) tx.rollback();
 770    }
 771    catch (Exception e)
 772    {
 773    }
 774    }
 775    }
 776    }
 777   
 778  4 MyThread[] threads = new MyThread[num_threads];
 779  4 for (int i = 0; i < threads.length; i++)
 780    {
 781  12 threads[i] = new MyThread("#" + i, myMutex);
 782    }
 783  4 for (int i = 0; i < threads.length; i++)
 784    {
 785  12 MyThread thread = threads[i];
 786  12 System.out.println("starting thread #" + i);
 787  12 thread.start();
 788    }
 789   
 790  4 TestingUtil.sleepThread(6000);
 791  4 synchronized (myMutex)
 792    {
 793  4 System.out.println("cache is " + c1.printLockInfo());
 794  4 System.out.println("******************* SIGNALLING THREADS ********************");
 795  4 myMutex.notifyAll();
 796    }
 797   
 798  4 for (MyThread thread : threads)
 799    {
 800  12 try
 801    {
 802  12 thread.join();
 803  12 System.out.println("Joined thread " + thread.getName());
 804    }
 805    catch (InterruptedException e)
 806    {
 807  0 e.printStackTrace();
 808    }
 809    }
 810   
 811  4 System.out.println("FINAL c1:\n" + c1.printDetails() + "\nlocks:\n" + c1.printLockInfo());
 812   
 813  4 assertEquals(0, c1.getNumberOfLocksHeld());
 814  4 assertEquals(0, c2.getNumberOfLocksHeld());
 815   
 816  4 c1.stop();
 817  4 c2.stop();
 818   
 819    // if(ex != null)
 820    // {
 821    // ex.printStackTrace();
 822    // fail("Thread failed: " + ex);
 823    // }
 824   
 825    // we can only expect 1 thread to succeed. The others will fail. So, threads.length -1 exceptions.
 826    // this is a timing issue - 2 threads still may succeed on a multi cpu system
 827    // assertEquals(threads.length - 1, exceptions.size());
 828   
 829  0 for (Exception exception : exceptions) assertEquals(TimeoutException.class, exception.getClass());
 830    }
 831   
 832   
 833    /**
 834    * Conncurrent put on 2 different instances.
 835    */
 836  2 public void testConcurrentPutsOnTwoInstances() throws Exception
 837    {
 838  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 839  2 final CacheImpl c1 = this.cache1;
 840  2 final CacheImpl c2 = this.cache2;
 841   
 842  2 Thread t1 = new Thread()
 843    {
 844    Transaction tx;
 845   
 846  2 public void run()
 847    {
 848  2 try
 849    {
 850  2 tx = beginTransaction();
 851  2 c1.put("/ben/wang", "name", "Ben Wang");
 852  2 TestingUtil.sleepThread(8000);
 853  2 tx.commit();// This should go thru
 854    }
 855    catch (Throwable ex)
 856    {
 857  0 ex.printStackTrace();
 858  0 t1_ex = ex;
 859    }
 860    }
 861    };
 862   
 863  2 Thread t2 = new Thread()
 864    {
 865    Transaction tx;
 866   
 867  2 public void run()
 868    {
 869  2 try
 870    {
 871  2 TestingUtil.sleepThread(1000);// give Thread1 time to acquire the lock
 872  2 tx = beginTransaction();
 873  2 c2.put("/ben/wang", "name", "Ben Jr.");
 874  2 tx.commit();// This will time out and rollback first because Thread1 has a tx going as well.
 875    }
 876    catch (RollbackException rollback_ex)
 877    {
 878  2 System.out.println("received rollback exception as expected");
 879    }
 880    catch (Throwable ex)
 881    {
 882  0 ex.printStackTrace();
 883  0 t2_ex = ex;
 884    }
 885    }
 886    };
 887   
 888    // Let the game start
 889  2 t1.start();
 890  2 t2.start();
 891   
 892    // Wait for thread to die but put an insurance of 5 seconds on it.
 893  2 t1.join();
 894  2 t2.join();
 895   
 896  2 if (t1_ex != null)
 897    {
 898  0 fail("Thread1 failed: " + t1_ex);
 899    }
 900  2 if (t2_ex != null)
 901    {
 902  0 fail("Thread2 failed: " + t2_ex);
 903    }
 904  2 assertEquals("Ben Wang", c1.get("/ben/wang", "name"));
 905    }
 906   
 907   
 908  2 public void testPut() throws Exception
 909    {
 910  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 911  2 final CacheImpl c1 = this.cache1;
 912   
 913   
 914  2 Thread t1 = new Thread()
 915    {
 916  2 public void run()
 917    {
 918  2 try
 919    {
 920  2 lock.acquire();
 921  2 System.out.println("-- t1 has lock");
 922  2 c1.put("/a/b/c", "age", 38);
 923  2 System.out.println("[Thread1] set value to 38");
 924   
 925  2 System.out.println("-- t1 releases lock");
 926  2 lock.release();
 927  2 TestingUtil.sleepThread(300);
 928  2 Thread.yield();
 929   
 930  2 lock.acquire();
 931  2 System.out.println("-- t1 has lock");
 932  2 c1.put("/a/b/c", "age", 39);
 933  2 System.out.println("[Thread1] set value to 39");
 934   
 935  2 System.out.println("-- t1 releases lock");
 936  2 lock.release();
 937  2 assertEquals(39, c1.get("/a/b/c", "age"));
 938    }
 939    catch (Throwable ex)
 940    {
 941  0 ex.printStackTrace();
 942  0 t1_ex = ex;
 943    }
 944    finally
 945    {
 946  2 lock.release();
 947    }
 948    }
 949    };
 950   
 951  2 Thread t2 = new Thread()
 952    {
 953  2 public void run()
 954    {
 955  2 try
 956    {
 957  2 TestingUtil.sleepThread(100);
 958  2 Thread.yield();
 959  2 lock.acquire();
 960  2 System.out.println("-- t2 has lock");
 961    // Should replicate the value right away.
 962  2 Integer val = (Integer) cache2.get("/a/b/c", "age");
 963  2 System.out.println("[Thread2] value is " + val);
 964  2 assertEquals(new Integer(38), val);
 965  2 System.out.println("-- t2 releases lock");
 966  2 lock.release();
 967  2 TestingUtil.sleepThread(300);
 968  2 Thread.yield();
 969  2 TestingUtil.sleepThread(500);
 970  2 lock.acquire();
 971  2 System.out.println("-- t2 has lock");
 972  2 val = (Integer) cache2.get("/a/b/c", "age");
 973  2 System.out.println("-- t2 releases lock");
 974  2 lock.release();
 975  2 assertEquals(new Integer(39), val);
 976    }
 977    catch (Throwable ex)
 978    {
 979  0 ex.printStackTrace();
 980  0 t2_ex = ex;
 981    }
 982    finally
 983    {
 984  2 lock.release();
 985    }
 986    }
 987    };
 988   
 989    // Let the game start
 990  2 t1.start();
 991  2 t2.start();
 992   
 993    // Wait for thread to die but put an insurance of 5 seconds on it.
 994  2 t1.join();
 995  2 t2.join();
 996  2 if (t1_ex != null)
 997    {
 998  0 fail("Thread1 failed: " + t1_ex);
 999    }
 1000  2 if (t2_ex != null)
 1001    {
 1002  0 fail("Thread2 failed: " + t2_ex);
 1003    }
 1004    }
 1005   
 1006    /**
 1007    * Test replicated cache with transaction. Idea is to have two threads running
 1008    * a local cache each that is replicating. Depending on whether cache1 commit/rollback or not,
 1009    * the cache2.get will get different values.
 1010    * Note that we have used sleep to interpose thread execution sequence.
 1011    * Although it's not fool proof, it is rather simple and intuitive.
 1012    *
 1013    * @throws Exception
 1014    */
 1015  2 public void testPutTx() throws Exception
 1016    {
 1017  2 Transaction tx = null;
 1018   
 1019  2 try
 1020    {
 1021  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 1022  2 cache1.getConfiguration().setSyncCommitPhase(true);
 1023  2 cache2.getConfiguration().setSyncCommitPhase(true);
 1024  2 tx = beginTransaction();
 1025  2 cache1.put("/a/b/c", "age", 38);
 1026  2 cache1.put("/a/b/c", "age", 39);
 1027  2 Object val = cache2.get("/a/b/c", "age");// must be null as not yet committed
 1028  2 assertNull(val);
 1029  2 tx.commit();
 1030   
 1031  0 tx = beginTransaction();
 1032  0 assertEquals(39, cache2.get("/a/b/c", "age"));// must not be null
 1033  0 tx.commit();
 1034    }
 1035    catch (Throwable t)
 1036    {
 1037  2 t.printStackTrace();
 1038  2 t1_ex = t;
 1039    }
 1040    finally
 1041    {
 1042  2 lock.release();
 1043    }
 1044    }
 1045   
 1046   
 1047    /**
 1048    * Have both cache1 and cache2 do add and commit. cache1 commit should time out
 1049    * since it can't obtain the lock when trying to replicate cache2. On the other hand,
 1050    * cache2 commit will succeed since now that cache1 is rollbacked and lock is
 1051    * released.
 1052    */
 1053  2 public void testPutTx1() throws Exception
 1054    {
 1055  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 1056  2 final CacheImpl c1 = this.cache1;
 1057  2 Thread t1 = new Thread()
 1058    {
 1059  2 public void run()
 1060    {
 1061  2 Transaction tx = null;
 1062   
 1063  2 try
 1064    {
 1065  2 lock.acquire();
 1066  2 tx = beginTransaction();
 1067  2 c1.put("/a/b/c", "age", 38);
 1068  2 c1.put("/a/b/c", "age", 39);
 1069  2 lock.release();
 1070   
 1071  2 TestingUtil.sleepThread(300);
 1072  2 lock.acquire();
 1073  2 try
 1074    {
 1075  2 tx.commit();
 1076    }
 1077    catch (RollbackException ex)
 1078    {
 1079  2 System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
 1080    }
 1081    finally
 1082    {
 1083  2 lock.release();
 1084    }
 1085    }
 1086    catch (Throwable ex)
 1087    {
 1088  0 ex.printStackTrace();
 1089  0 t1_ex = ex;
 1090    }
 1091    finally
 1092    {
 1093  2 lock.release();
 1094    }
 1095    }
 1096    };
 1097   
 1098  2 Thread t2 = new Thread()
 1099    {
 1100  2 public void run()
 1101    {
 1102  2 Transaction tx = null;
 1103   
 1104  2 try
 1105    {
 1106  2 sleep(200);
 1107  2 Thread.yield();
 1108  2 lock.acquire();
 1109  2 tx = beginTransaction();
 1110  2 assertNull(cache2.get("/a/b/c", "age"));// must be null as not yet committed
 1111  2 cache2.put("/a/b/c", "age", 40);
 1112  2 lock.release();
 1113   
 1114  2 TestingUtil.sleepThread(300);
 1115  2 lock.acquire();
 1116  2 assertEquals(40, cache2.get("/a/b/c", "age"));// must not be null
 1117  2 tx.commit();
 1118  2 lock.release();
 1119   
 1120  2 TestingUtil.sleepThread(1000);
 1121  2 tx = beginTransaction();
 1122  2 assertEquals("After cache2 commit", 40, cache2.get("/a/b/c", "age"));
 1123  2 tx.commit();
 1124    }
 1125    catch (Throwable ex)
 1126    {
 1127  0 ex.printStackTrace();
 1128  0 t2_ex = ex;
 1129    }
 1130    finally
 1131    {
 1132  2 lock.release();
 1133    }
 1134    }
 1135    };
 1136   
 1137    // Let the game start
 1138  2 t1.start();
 1139  2 t2.start();
 1140   
 1141  2 t1.join();
 1142  2 t2.join();
 1143   
 1144  2 if (t1_ex != null)
 1145    {
 1146  0 fail("Thread1 failed: " + t1_ex);
 1147    }
 1148  2 if (t2_ex != null)
 1149    {
 1150  0 fail("Thread2 failed: " + t2_ex);
 1151    }
 1152    }
 1153   
 1154   
 1155  2 public void testPutTxWithRollback() throws Exception
 1156    {
 1157  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 1158  2 final CacheImpl c2 = this.cache1;
 1159  2 Thread t1 = new Thread()
 1160    {
 1161  2 public void run()
 1162    {
 1163  2 Transaction tx = null;
 1164   
 1165  2 try
 1166    {
 1167  2 lock.acquire();
 1168  2 tx = beginTransaction();
 1169  2 c2.put("/a/b/c", "age", 38);
 1170  2 c2.put("/a/b/c", "age", 39);
 1171  2 lock.release();
 1172   
 1173  2 TestingUtil.sleepThread(100);
 1174  2 lock.acquire();
 1175  2 tx.rollback();
 1176  2 lock.release();
 1177    }
 1178    catch (Throwable ex)
 1179    {
 1180  0 ex.printStackTrace();
 1181  0 t1_ex = ex;
 1182    }
 1183    finally
 1184    {
 1185  2 lock.release();
 1186    }
 1187    }
 1188    };
 1189   
 1190  2 Thread t2 = new Thread()
 1191    {
 1192  2 public void run()
 1193    {
 1194  2 Transaction tx = null;
 1195   
 1196  2 try
 1197    {
 1198  2 sleep(200);
 1199  2 Thread.yield();
 1200  2 lock.acquire();
 1201  2 tx = beginTransaction();
 1202  2 assertNull(cache2.get("/a/b/c", "age"));// must be null as not yet committed
 1203  2 lock.release();
 1204   
 1205  2 TestingUtil.sleepThread(100);
 1206  2 lock.acquire();
 1207  2 assertNull(cache2.get("/a/b/c", "age"));// must be null as rolledback
 1208  2 tx.commit();
 1209  2 lock.release();
 1210    }
 1211    catch (Throwable ex)
 1212    {
 1213  0 ex.printStackTrace();
 1214  0 t2_ex = ex;
 1215    }
 1216    finally
 1217    {
 1218  2 lock.release();
 1219    }
 1220    }
 1221    };
 1222   
 1223    // Let the game start
 1224  2 t1.start();
 1225  2 t2.start();
 1226   
 1227    // Wait for thread to die but put an insurance of 5 seconds on it.
 1228  2 t1.join();
 1229  2 t2.join();
 1230  2 if (t1_ex != null)
 1231    {
 1232  0 fail("Thread1 failed: " + t1_ex);
 1233    }
 1234  2 if (t2_ex != null)
 1235    {
 1236  0 fail("Thread2 failed: " + t2_ex);
 1237    }
 1238    }
 1239   
 1240   
 1241    static class TransactionAborter implements Synchronization
 1242    {
 1243    Transaction ltx = null;
 1244   
 1245  2 public TransactionAborter(Transaction ltx)
 1246    {
 1247  2 this.ltx = ltx;
 1248    }
 1249   
 1250  2 public void beforeCompletion()
 1251    {
 1252  2 try
 1253    {
 1254  2 ltx.setRollbackOnly();
 1255    }
 1256    catch (SystemException e)
 1257    {
 1258    // who cares
 1259    }
 1260    }
 1261   
 1262  2 public void afterCompletion(int status)
 1263    {
 1264    }
 1265    }
 1266   
 1267    @CacheListener
 1268    static class CallbackListener
 1269    {
 1270   
 1271    CacheImpl callbackCache;
 1272    Object callbackKey;
 1273    Exception ex;
 1274    Object mutex = new Object();
 1275   
 1276  0 CallbackListener(CacheImpl cache, Object callbackKey)
 1277    {
 1278  0 this.callbackCache = cache;
 1279  0 this.callbackKey = callbackKey;
 1280  0 cache.getNotifier().addCacheListener(this);
 1281    }
 1282   
 1283  0 @NodeModified
 1284    public void nodeModified(NodeEvent e)
 1285    {
 1286  0 if (!e.isPre())
 1287    {
 1288    // Lock on a mutex so test can't check for an exception
 1289    // until the get call completes
 1290  0 synchronized (mutex)
 1291    {
 1292  0 try
 1293    {
 1294  0 callbackCache.get(e.getFqn(), callbackKey);
 1295    }
 1296    catch (CacheException exc)
 1297    {
 1298  0 exc.printStackTrace();
 1299  0 ex = exc;
 1300    }
 1301    }
 1302    }
 1303    }
 1304   
 1305  0 Exception getCallbackException()
 1306    {
 1307  0 synchronized (mutex)
 1308    {
 1309  0 return ex;
 1310    }
 1311    }
 1312   
 1313    }
 1314   
 1315    static class TransactionAborterCallbackListener extends CallbackListener
 1316    {
 1317   
 1318    TransactionManager callbackTM;
 1319   
 1320  0 TransactionAborterCallbackListener(CacheImpl cache, Object callbackKey)
 1321    {
 1322  0 super(cache, callbackKey);
 1323  0 callbackTM = callbackCache.getTransactionManager();
 1324    }
 1325   
 1326  0 @NodeModified
 1327    public void nodeModified(NodeEvent ne)
 1328    {
 1329  0 if (!ne.isPre())
 1330    {
 1331  0 try
 1332    {
 1333  0 Transaction tx = callbackTM.getTransaction();
 1334  0 if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
 1335    {
 1336    // this will rollback the transaction
 1337  0 tx.registerSynchronization(new TransactionAborter(tx));
 1338    }
 1339    else
 1340    {
 1341  0 super.nodeModified(ne);
 1342    }
 1343   
 1344    }
 1345    catch (Exception e)
 1346    {
 1347  0 e.printStackTrace();
 1348  0 if (ex == null)
 1349    {
 1350  0 ex = e;
 1351    }
 1352    }
 1353    }
 1354    }
 1355   
 1356    }
 1357   
 1358    @CacheListener
 1359    static class TransactionAborterListener
 1360    {
 1361   
 1362    TransactionManager callbackTM;
 1363    Object mutex = new Object();
 1364    Exception ex;
 1365   
 1366  0 TransactionAborterListener(CacheImpl cache)
 1367    {
 1368  0 callbackTM = cache.getTransactionManager();
 1369  0 cache.getNotifier().addCacheListener(this);
 1370    }
 1371   
 1372  0 @NodeModified
 1373    public void nodeModified(NodeEvent ne)
 1374    {
 1375  0 if (!ne.isPre())
 1376    {
 1377  0 synchronized (mutex)
 1378    {
 1379  0 try
 1380    {
 1381  0 Transaction tx = callbackTM.getTransaction();
 1382  0 if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
 1383    {
 1384    // this will rollback the transaction
 1385  0 tx.setRollbackOnly();
 1386    }
 1387    }
 1388    catch (Exception e)
 1389    {
 1390  0 e.printStackTrace();
 1391  0 if (ex == null)
 1392    {
 1393  0 ex = e;
 1394    }
 1395    }
 1396    }
 1397    }
 1398    }
 1399   
 1400  0 Exception getCallbackException()
 1401    {
 1402  0 synchronized (mutex)
 1403    {
 1404  0 return ex;
 1405    }
 1406    }
 1407   
 1408    }
 1409   
 1410  2 public static Test suite()
 1411    {
 1412    // return getDeploySetup(SyncTxUnitTestCase.class, "cachetest.jar");
 1413  2 return new TestSuite(SyncReplTxTest.class);
 1414    }
 1415   
 1416   
 1417    }