Clover coverage report -
Coverage timestamp: Wed Jan 31 2007 15:38:53 EST
file stats: LOC: 1,394   Methods: 51
NCLOC: 1,004   Classes: 6
 
 Source file Conditionals Statements Methods TOTAL
SyncReplTxTest.java 67.4% 90.9% 100% 89.9%
coverage coverage
 1    /*
 2    *
 3    * JBoss, the OpenSource J2EE webOS
 4    *
 5    * Distributable under LGPL license.
 6    * See terms of license at gnu.org.
 7    */
 8    package org.jboss.cache.replicated;
 9   
 10    import junit.framework.Test;
 11    import junit.framework.TestCase;
 12    import junit.framework.TestSuite;
 13    import org.apache.commons.logging.Log;
 14    import org.apache.commons.logging.LogFactory;
 15    import org.jboss.cache.AbstractCacheListener;
 16    import org.jboss.cache.Cache;
 17    import org.jboss.cache.CacheException;
 18    import org.jboss.cache.CacheImpl;
 19    import org.jboss.cache.DefaultCacheFactory;
 20    import org.jboss.cache.Fqn;
 21    import org.jboss.cache.config.Configuration;
 22    import org.jboss.cache.lock.IsolationLevel;
 23    import org.jboss.cache.lock.TimeoutException;
 24    import org.jboss.cache.misc.TestingUtil;
 25    import org.jboss.cache.transaction.DummyTransactionManager;
 26   
 27    import javax.naming.Context;
 28    import javax.transaction.NotSupportedException;
 29    import javax.transaction.RollbackException;
 30    import javax.transaction.Status;
 31    import javax.transaction.Synchronization;
 32    import javax.transaction.SystemException;
 33    import javax.transaction.Transaction;
 34    import javax.transaction.TransactionManager;
 35    import java.util.ArrayList;
 36    import java.util.List;
 37    import java.util.Map;
 38    import java.util.concurrent.Semaphore;
 39   
 40    /**
 41    * Replicated unit test for sync transactional CacheImpl
 42    * Note: we use DummyTransactionManager for Tx purpose instead of relying on
 43    * jta.
 44    *
 45    * @version $Revision: 1.20 $
 46    */
 47    public class SyncReplTxTest extends TestCase
 48    {
 49    private static Log log = LogFactory.getLog(SyncReplTxTest.class);
 50    private CacheImpl cache1;
 51    private CacheImpl cache2;
 52   
 53    private String old_factory = null;
 54    private final String FACTORY = "org.jboss.cache.transaction.DummyContextFactory";
 55    private Semaphore lock = new Semaphore(1);
 56    private Throwable t1_ex;
 57    private Throwable t2_ex;
 58   
 59   
 60  36 public SyncReplTxTest(String name)
 61    {
 62  36 super(name);
 63    }
 64   
 65  36 public void setUp() throws Exception
 66    {
 67  36 super.setUp();
 68  36 old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
 69  36 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
 70  36 t1_ex = t2_ex = null;
 71    }
 72   
 73  36 public void tearDown() throws Exception
 74    {
 75  36 super.tearDown();
 76  36 DummyTransactionManager.destroy();
 77  36 destroyCaches();
 78  36 if (old_factory != null)
 79    {
 80  34 System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
 81  34 old_factory = null;
 82    }
 83    }
 84   
 85  52 private Transaction beginTransaction() throws SystemException, NotSupportedException
 86    {
 87  52 DummyTransactionManager mgr = DummyTransactionManager.getInstance();
 88  52 mgr.begin();
 89  52 return mgr.getTransaction();
 90    }
 91   
 92  32 private void initCaches(Configuration.CacheMode caching_mode) throws Exception
 93    {
 94  32 cache1 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 95  32 cache2 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 96  32 cache1.getConfiguration().setCacheMode(caching_mode);
 97  32 cache2.getConfiguration().setCacheMode(caching_mode);
 98  32 cache1.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
 99  32 cache2.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
 100   
 101  32 cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
 102  32 cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
 103  32 cache1.getConfiguration().setLockAcquisitionTimeout(5000);
 104  32 cache2.getConfiguration().setLockAcquisitionTimeout(5000);
 105   
 106  32 configureMultiplexer(cache1);
 107  32 configureMultiplexer(cache2);
 108   
 109  32 cache1.start();
 110  32 cache2.start();
 111   
 112  32 validateMultiplexer(cache1);
 113  32 validateMultiplexer(cache2);
 114    }
 115   
 116    /**
 117    * Provides a hook for multiplexer integration. This default implementation
 118    * is a no-op; subclasses that test mux integration would override
 119    * to integrate the given cache with a multiplexer.
 120    * <p/>
 121    * param cache a cache that has been configured but not yet created.
 122    */
 123  64 protected void configureMultiplexer(Cache cache) throws Exception
 124    {
 125    // default does nothing
 126    }
 127   
 128    /**
 129    * Provides a hook to check that the cache's channel came from the
 130    * multiplexer, or not, as expected. This default impl asserts that
 131    * the channel did not come from the multiplexer.
 132    *
 133    * @param cache a cache that has already been started
 134    */
 135  64 protected void validateMultiplexer(Cache cache)
 136    {
 137  64 assertFalse("Cache is not using multiplexer", cache.getConfiguration().isUsingMultiplexer());
 138    }
 139   
 140  36 private void destroyCaches()
 141    {
 142  36 if (cache1 != null)
 143    {
 144  32 cache1.stop();
 145    }
 146  36 if (cache2 != null)
 147    {
 148  32 cache2.stop();
 149    }
 150  36 cache1 = null;
 151  36 cache2 = null;
 152    }
 153   
 154  2 public void testLockRemoval() throws Exception
 155    {
 156  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 157  2 cache1.getConfiguration().setSyncCommitPhase(true);
 158  2 cache1.releaseAllLocks("/");
 159  2 Transaction tx = beginTransaction();
 160  2 cache1.put("/bela/ban", "name", "Bela Ban");
 161  2 assertEquals(3, cache1.getNumberOfLocksHeld());
 162  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 163  2 tx.commit();
 164  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 165  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 166    }
 167   
 168   
 169  2 public void testSyncRepl() throws Exception
 170    {
 171  2 Integer age;
 172  2 Transaction tx;
 173   
 174  2 try
 175    {
 176  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 177  2 cache1.getConfiguration().setSyncCommitPhase(true);
 178  2 cache2.getConfiguration().setSyncCommitPhase(true);
 179   
 180    // assertEquals(2, cache1.getMembers().size());
 181   
 182  2 tx = beginTransaction();
 183  2 cache1.put("/a/b/c", "age", 38);
 184  2 TransactionManager mgr = cache1.getTransactionManager();
 185  2 tx = mgr.suspend();
 186  2 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
 187  2 log.debug("cache1: locks held before commit: " + cache1.printLockInfo());
 188  2 log.debug("cache2: locks held before commit: " + cache2.printLockInfo());
 189  2 mgr.resume(tx);
 190  2 tx.commit();
 191  2 log.debug("cache1: locks held after commit: " + cache1.printLockInfo());
 192  2 log.debug("cache2: locks held after commit: " + cache2.printLockInfo());
 193   
 194    // value on cache2 must be 38
 195  2 age = (Integer) cache2.get("/a/b/c", "age");
 196  2 assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
 197  2 assertTrue("\"age\" must be 38", age == 38);
 198    }
 199    catch (Exception e)
 200    {
 201  0 fail(e.toString());
 202    }
 203    }
 204   
 205    /**
 206    * @throws Exception
 207    */
 208  2 public void testSimplePut() throws Exception
 209    {
 210  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 211   
 212  2 cache1.put("/JSESSION/localhost/192.168.1.10:32882/Courses/0", "Instructor", "Ben Wang");
 213   
 214  2 cache1.put("/JSESSION/localhost/192.168.1.10:32882/1", "Number", 10);
 215    }
 216   
 217   
 218  2 public void testSimpleTxPut() throws Exception
 219    {
 220  2 Transaction tx;
 221  2 final Fqn NODE1 = Fqn.fromString("/one/two/three");
 222  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 223   
 224  2 tx = beginTransaction();
 225  2 cache1.put(NODE1, "age", 38);
 226  2 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 227  2 tx.commit();
 228   
 229    /*
 230    tx=beginTransaction();
 231    cache1.put(NODE1, "age", new Integer(38));
 232    cache1.put(NODE2, "name", "Ben of The Far East");
 233    cache1.put(NODE3, "key", "UnknowKey");
 234    System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 235   
 236    tx.commit();
 237    */
 238   
 239    /*
 240    tx=beginTransaction();
 241    cache1.put(NODE1, "age", new Integer(38));
 242    cache1.put(NODE1, "AOPInstance", new AOPInstance());
 243    cache1.put(NODE2, "AOPInstance", new AOPInstance());
 244    cache1.put(NODE1, "AOPInstance", new AOPInstance());
 245    tx.commit();
 246    System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 247    */
 248    }
 249   
 250  2 public void testSyncReplWithModficationsOnBothCaches() throws Exception
 251    {
 252  2 Integer age;
 253  2 Transaction tx;
 254  2 final Fqn NODE1 = Fqn.fromString("/one/two/three");
 255  2 final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
 256   
 257  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 258   
 259    // create roots first
 260  2 cache1.put("/one/two", null);
 261  2 cache2.put("/eins/zwei", null);
 262   
 263  2 cache1.getConfiguration().setSyncCommitPhase(true);
 264  2 cache2.getConfiguration().setSyncCommitPhase(true);
 265   
 266  2 tx = beginTransaction();
 267  2 cache1.put(NODE1, "age", 38);
 268  2 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 269   
 270  2 cache2.put(NODE2, "age", 39);
 271  2 System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
 272   
 273  2 System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
 274  2 System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
 275   
 276  2 try
 277    {
 278  2 tx.commit();
 279  0 fail("Should not succeed with SERIALIZABLE semantics");
 280    }
 281    catch (Exception e)
 282    {
 283    //should be a classic deadlock here.
 284    }
 285   
 286  2 System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
 287  2 System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
 288   
 289    /*
 290    assertTrue(cache1.exists(NODE1));
 291    assertTrue(cache1.exists(NODE2));
 292    assertTrue(cache1.exists(NODE1));
 293    assertTrue(cache2.exists(NODE2));
 294   
 295    age = (Integer) cache1.get(NODE1, "age");
 296    assertNotNull("\"age\" obtained from cache1 for " + NODE1 + " must be non-null ", age);
 297    assertTrue("\"age\" must be 38", age == 38);
 298   
 299    age = (Integer) cache2.get(NODE1, "age");
 300    assertNotNull("\"age\" obtained from cache2 for " + NODE1 + " must be non-null ", age);
 301    assertTrue("\"age\" must be 38", age == 38);
 302   
 303    age = (Integer) cache1.get(NODE2, "age");
 304    assertNotNull("\"age\" obtained from cache1 for " + NODE2 + " must be non-null ", age);
 305    assertTrue("\"age\" must be 39", age == 39);
 306   
 307    age = (Integer) cache2.get(NODE2, "age");
 308    assertNotNull("\"age\" obtained from cache2 for " + NODE2 + " must be non-null ", age);
 309    assertTrue("\"age\" must be 39", age == 39);
 310    */
 311   
 312  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 313  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 314  2 System.out.println("TransactionTable for cache1:\n" + cache1.getTransactionTable().toString(true));
 315  2 System.out.println("TransactionTable for cache2:\n" + cache2.getTransactionTable().toString(true));
 316    }
 317   
 318  2 public void testSyncReplWithModficationsOnBothCachesSameData() throws Exception
 319    {
 320  2 Transaction tx;
 321  2 final Fqn NODE = Fqn.fromString("/one/two/three");
 322  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 323  2 tx = beginTransaction();
 324  2 cache1.put(NODE, "age", 38);
 325  2 System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
 326   
 327  2 cache2.put(NODE, "age", 39);
 328  2 System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
 329   
 330  2 System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
 331  2 System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
 332   
 333  2 try
 334    {
 335  2 tx.commit();
 336  0 fail("commit should throw a RollbackException, we should not get here");
 337    }
 338    catch (RollbackException rollback)
 339    {
 340  2 System.out.println("Transaction was rolled back, this is correct");
 341    }
 342   
 343  2 System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
 344  2 System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
 345   
 346  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 347  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 348   
 349  2 assertEquals(0, cache1.getNumberOfNodes());
 350  2 assertEquals(0, cache2.getNumberOfNodes());
 351    }
 352   
 353   
 354  2 public void testSyncReplWithModficationsOnBothCachesWithRollback() throws Exception
 355    {
 356  2 Transaction tx;
 357  2 final Fqn NODE1 = Fqn.fromString("/one/two/three");
 358  2 final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
 359   
 360  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 361   
 362  2 cache1.getConfiguration().setSyncRollbackPhase(true);
 363  2 cache2.getConfiguration().setSyncRollbackPhase(true);
 364   
 365  2 tx = beginTransaction();
 366  2 cache1.put(NODE1, "age", 38);
 367  2 cache2.put(NODE2, "age", 39);
 368   
 369  2 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
 370  2 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
 371   
 372    // this will rollback the transaction
 373  2 tx.registerSynchronization(new TransactionAborter(tx));
 374   
 375  2 try
 376    {
 377  2 tx.commit();
 378  0 fail("commit should throw a RollbackException, we should not get here");
 379    }
 380    catch (RollbackException rollback)
 381    {
 382  2 System.out.println("Transaction was rolled back, this is correct");
 383    }
 384   
 385  2 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
 386  2 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
 387   
 388  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 389  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 390   
 391  2 assertEquals(0, cache1.getNumberOfNodes());
 392  2 assertEquals(0, cache2.getNumberOfNodes());
 393    }
 394   
 395   
 396    /**
 397    * Test for JBCACHE-359 -- does a callback into cache from a listener
 398    * interfere with transaction rollback.
 399    *
 400    * @throws Exception
 401    */
 402  2 public void testSyncReplWithRollbackAndListener() throws Exception
 403    {
 404  2 Transaction tx;
 405  2 final Fqn NODE1 = Fqn.fromString("/one/two/three");
 406   
 407  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 408   
 409  2 cache1.getConfiguration().setSyncRollbackPhase(true);
 410  2 cache2.getConfiguration().setSyncRollbackPhase(true);
 411   
 412    // Test with a rollback on the sending side
 413   
 414  2 CallbackListener cbl1 = new CallbackListener(cache1, "age");
 415  2 CallbackListener cbl2 = new CallbackListener(cache2, "age");
 416   
 417  2 tx = beginTransaction();
 418  2 cache1.put(NODE1, "age", 38);
 419   
 420  2 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
 421  2 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
 422   
 423    // this will rollback the transaction
 424  2 tx.registerSynchronization(new TransactionAborter(tx));
 425   
 426  2 try
 427    {
 428  2 tx.commit();
 429  0 fail("commit should throw a RollbackException, we should not get here");
 430    }
 431    catch (RollbackException rollback)
 432    {
 433  2 rollback.printStackTrace();
 434  2 System.out.println("Transaction was rolled back, this is correct");
 435    }
 436   
 437    // Sleep, as the rollback call to cache2 is async
 438  2 TestingUtil.sleepThread(1000);
 439   
 440  2 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
 441  2 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
 442   
 443  2 assertNull(cbl1.getCallbackException());
 444  2 assertNull(cbl2.getCallbackException());
 445   
 446  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 447  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 448   
 449  2 assertEquals(0, cache1.getNumberOfNodes());
 450  2 assertEquals(0, cache2.getNumberOfNodes());
 451   
 452    // Test with a rollback on the receiving side
 453   
 454  2 cache2.getNotifier().removeCacheListener(cbl2);
 455    // listener aborts any active tx
 456  2 cbl2 = new TransactionAborterCallbackListener(cache2, "age");
 457   
 458  2 tx = beginTransaction();
 459  2 cache1.put(NODE1, "age", 38);
 460   
 461  2 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
 462  2 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
 463   
 464  2 tx.commit();
 465   
 466    // Sleep, as the commit call to cache2 is async
 467  2 TestingUtil.sleepThread(1000);
 468   
 469  2 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
 470  2 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
 471   
 472  2 assertNull(cbl1.getCallbackException());
 473  2 assertNull(cbl2.getCallbackException());
 474   
 475  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 476  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 477   
 478    // cache1 didn't fail, so should have 3 nodes
 479  2 assertEquals(3, cache1.getNumberOfNodes());
 480  2 assertEquals(0, cache2.getNumberOfNodes());
 481   
 482    }
 483   
 484   
 485    /**
 486    * Test for JBCACHE-361 -- does marking a tx on the remote side
 487    * rollback-only cause a rollback on the originating side?
 488    *
 489    * @throws Exception
 490    */
 491  2 public void testSyncReplWithRemoteRollback() throws Exception
 492    {
 493  2 Transaction tx;
 494  2 final Fqn NODE1 = Fqn.fromString("/one/two/three");
 495   
 496  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 497   
 498  2 cache1.getConfiguration().setSyncRollbackPhase(true);
 499  2 cache2.getConfiguration().setSyncRollbackPhase(true);
 500   
 501    // Test with a rollback on the remote side
 502   
 503    // listener aborts any active tx
 504  2 TransactionAborterListener tal = new TransactionAborterListener(cache2);
 505   
 506  2 tx = beginTransaction();
 507  2 cache1.put(NODE1, "age", 38);
 508   
 509  2 System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
 510  2 System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
 511   
 512  2 try
 513    {
 514  2 tx.commit();
 515  0 fail("commit should throw a RollbackException, we should not get here");
 516    }
 517    catch (RollbackException rollback)
 518    {
 519  2 System.out.println("Transaction was rolled back, this is correct");
 520    }
 521   
 522    // Sleep, as the commit call to cache2 is async
 523  2 TestingUtil.sleepThread(1000);
 524   
 525  2 System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
 526  2 System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
 527   
 528  2 assertNull(tal.getCallbackException());
 529   
 530  2 assertEquals(0, cache1.getNumberOfLocksHeld());
 531  2 assertEquals(0, cache2.getNumberOfLocksHeld());
 532   
 533  2 assertEquals(0, cache1.getNumberOfNodes());
 534  2 assertEquals(0, cache2.getNumberOfNodes());
 535   
 536    }
 537   
 538   
 539  2 public void testASyncRepl() throws Exception
 540    {
 541  2 Integer age;
 542  2 Transaction tx;
 543   
 544  2 initCaches(Configuration.CacheMode.REPL_ASYNC);
 545   
 546  2 tx = beginTransaction();
 547  2 cache1.put("/a/b/c", "age", 38);
 548  2 Thread.sleep(1000);
 549  2 assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
 550  2 tx.commit();
 551  2 Thread.sleep(1000);
 552   
 553    // value on cache2 must be 38
 554  2 age = (Integer) cache2.get("/a/b/c", "age");
 555  2 assertNotNull("\"age\" obtained from cache2 is null ", age);
 556  2 assertTrue("\"age\" must be 38", age == 38);
 557   
 558    }
 559   
 560    /**
 561    * Tests concurrent modifications: thread1 succeeds and thread2 is blocked until thread1 is done, and then succeeds
 562    * too. However, this is flawed with the introduction of interceptors, here's why.<br/>
 563    * <ul>
 564    * <li>Thread1 acquires the lock for /bela/ban on cache1
 565    * <li>Thread2 blocks on Thread1 to release the lock
 566    * <li>Thread1 commits: this means the TransactionInterceptor and the ReplicationInterceptor are called in
 567    * the sequence in which they registered. Unfortunately, the TransactionInterceptor registered first. In the
 568    * PREPARE phase, the ReplicationInterceptor calls prepare() in cache2 synchronously. The TxInterceptor
 569    * does nothing. The the COMMIT phase, the TxInterceptor commits the data by releasing the locks locally and
 570    * then the ReplicationInterceptor sends an asynchronous COMMIT to cache2.
 571    * <li>Because the TxInterceptor for Thread1 releases the locks locally <em>before</em> sending the async COMMIT,
 572    * Thread2 is able to acquire the lock for /bela/ban in cache1 and then starts the PREPARE phase by sending a
 573    * synchronous PREPARE to cache2. If this PREPARE arrives at cache2 <em>before</em> the COMMIT from Thread1,
 574    * the PREPARE will block because it attempts to acquire a lock on /bela/ban on cache2 still held by Thread1
 575    * (which would be released by Thread1's COMMIT). This results in deadlock, which is resolved by Thread2 running
 576    * into a timeout with subsequent rollback and Thread1 succeeding.<br/>
 577    * </ul>
 578    * There are 3 solutions to this:
 579    * <ol>
 580    * <li>Do nothing. This is standard behavior for concurrent access to the same data. Same thing if the 2 threads
 581    * operated on the same data in <em>separate</em> caches, e.g. Thread1 on /bela/ban in cache1 and Thread2 on
 582    * /bela/ban in cache2. The semantics of Tx commit as handled by the interceptors is: after tx1.commit() returns
 583    * the locks held by tx1 are release and a COMMIT message is on the way (if sent asynchronously).
 584    * <li>Force an order over TxInterceptor and ReplicationInterceptor. This would require ReplicationInterceptor
 585    * to always be fired first on TX commit. Downside: the interceptors have an implicit dependency, which is not
 586    * nice.
 587    * <li>Priority-order requests at the receiver; e.g. a COMMIT could release a blocked PREPARE. This is bad because
 588    * it violates JGroups' FIFO ordering guarantees.
 589    * </ol>
 590    * I'm currently investigating solution #2, ie. creating an OrderedSynchronizationHandler, which allows other
 591    * SynchronizationHandlers to register (atHead, atTail), and the OrderedSynchronizationHandler would call the
 592    * SynchronizationHandler in the order in which they are defined.
 593    *
 594    * @throws Exception
 595    */
 596  2 public void testConcurrentPuts() throws Exception
 597    {
 598  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 599  2 cache1.getConfiguration().setSyncCommitPhase(true);
 600   
 601  2 Thread t1 = new Thread("Thread1")
 602    {
 603    Transaction tx;
 604   
 605  2 public void run()
 606    {
 607  2 try
 608    {
 609  2 tx = beginTransaction();
 610  2 cache1.put("/bela/ban", "name", "Bela Ban");
 611  2 TestingUtil.sleepThread(2000);// Thread2 will be blocked until we commit
 612  2 tx.commit();
 613  2 System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo());
 614  2 System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo());
 615    }
 616    catch (Throwable ex)
 617    {
 618  0 ex.printStackTrace();
 619  0 t1_ex = ex;
 620    }
 621    }
 622    };
 623   
 624  2 Thread t2 = new Thread("Thread2")
 625    {
 626    Transaction tx;
 627   
 628  2 public void run()
 629    {
 630  2 try
 631    {
 632  2 TestingUtil.sleepThread(1000);// give Thread1 time to acquire the lock
 633  2 tx = beginTransaction();
 634  2 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
 635  2 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
 636  2 cache1.put("/bela/ban", "name", "Michelle Ban");
 637  2 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
 638  2 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
 639  2 tx.commit();
 640  2 System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
 641  2 System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
 642    }
 643    catch (Throwable ex)
 644    {
 645  0 ex.printStackTrace();
 646  0 t2_ex = ex;
 647    }
 648    }
 649    };
 650   
 651    // Let the game start
 652  2 t1.start();
 653  2 t2.start();
 654   
 655    // Wait for threads to die
 656  2 t1.join();
 657  2 t2.join();
 658   
 659  2 if (t1_ex != null)
 660    {
 661  0 fail("Thread1 failed: " + t1_ex);
 662    }
 663  2 if (t2_ex != null)
 664    {
 665  0 fail("Thread2 failed: " + t2_ex);
 666    }
 667   
 668  2 assertEquals("Michelle Ban", cache1.get("/bela/ban", "name"));
 669    }
 670   
 671   
 672    /**
 673    * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
 674    */
 675  2 public void testConcurrentCommitsWith1Thread() throws Exception
 676    {
 677  2 _testConcurrentCommits(1);
 678    }
 679   
 680    /**
 681    * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
 682    */
 683  2 public void testConcurrentCommitsWith5Threads() throws Exception
 684    {
 685  2 _testConcurrentCommits(5);
 686    }
 687   
 688    /**
 689    * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
 690    */
 691  4 private void _testConcurrentCommits(int num_threads) throws Exception
 692    {
 693  4 Object myMutex = new Object();
 694   
 695  4 final CacheImpl c1 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 696  4 final CacheImpl c2 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 697  4 c1.getConfiguration().setClusterName("TempCluster");
 698  4 c2.getConfiguration().setClusterName("TempCluster");
 699  4 c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
 700  4 c2.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
 701  4 c1.getConfiguration().setSyncCommitPhase(true);
 702  4 c2.getConfiguration().setSyncCommitPhase(true);
 703  4 c1.getConfiguration().setSyncRollbackPhase(true);
 704  4 c2.getConfiguration().setSyncRollbackPhase(true);
 705  4 c1.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
 706  4 c2.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
 707  4 c1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
 708  4 c2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
 709  4 c1.getConfiguration().setLockAcquisitionTimeout(5000);
 710  4 c2.getConfiguration().setLockAcquisitionTimeout(5000);
 711  4 c1.start();
 712  4 c2.start();
 713  4 final List<Exception> exceptions = new ArrayList<Exception>();
 714   
 715    class MyThread extends Thread
 716    {
 717    Object mutex;
 718   
 719  12 public MyThread(String name, Object mutex)
 720    {
 721  12 super(name);
 722  12 this.mutex = mutex;
 723    }
 724   
 725  12 public void run()
 726    {
 727  12 Transaction tx = null;
 728   
 729  12 try
 730    {
 731  12 tx = beginTransaction();
 732  12 c1.put("/thread/" + getName(), null);
 733  12 System.out.println("Thread " + getName() + " after put(): " + c1.toString());
 734  12 System.out.println("Thread " + getName() + " waiting on mutex");
 735  12 synchronized (mutex)
 736    {
 737  12 mutex.wait();
 738    }
 739  12 System.out.println("Thread " + getName() + " committing");
 740  12 tx.commit();
 741  12 System.out.println("Thread " + getName() + " committed successfully");
 742    }
 743    catch (Exception e)
 744    {
 745  0 exceptions.add(e);
 746    }
 747    finally
 748    {
 749  12 try
 750    {
 751  12 if (tx != null) tx.rollback();
 752    }
 753    catch (Exception e)
 754    {
 755    }
 756    }
 757    }
 758    }
 759   
 760  4 MyThread[] threads = new MyThread[num_threads];
 761  4 for (int i = 0; i < threads.length; i++)
 762    {
 763  12 threads[i] = new MyThread("#" + i, myMutex);
 764    }
 765  4 for (int i = 0; i < threads.length; i++)
 766    {
 767  12 MyThread thread = threads[i];
 768  12 System.out.println("starting thread #" + i);
 769  12 thread.start();
 770    }
 771   
 772  4 TestingUtil.sleepThread(6000);
 773  4 synchronized (myMutex)
 774    {
 775  4 System.out.println("cache is " + c1.printLockInfo());
 776  4 System.out.println("******************* SIGNALLING THREADS ********************");
 777  4 myMutex.notifyAll();
 778    }
 779   
 780  4 for (MyThread thread : threads)
 781    {
 782  12 try
 783    {
 784  12 thread.join();
 785  12 System.out.println("Joined thread " + thread.getName());
 786    }
 787    catch (InterruptedException e)
 788    {
 789  0 e.printStackTrace();
 790    }
 791    }
 792   
 793  4 System.out.println("FINAL c1:\n" + c1.printDetails() + "\nlocks:\n" + c1.printLockInfo());
 794   
 795  4 assertEquals(0, c1.getNumberOfLocksHeld());
 796  4 assertEquals(0, c2.getNumberOfLocksHeld());
 797   
 798  4 c1.stop();
 799  4 c2.stop();
 800   
 801    // if(ex != null)
 802    // {
 803    // ex.printStackTrace();
 804    // fail("Thread failed: " + ex);
 805    // }
 806   
 807    // we can only expect 1 thread to succeed. The others will fail. So, threads.length -1 exceptions.
 808    // this is a timing issue - 2 threads still may succeed on a multi cpu system
 809    // assertEquals(threads.length - 1, exceptions.size());
 810   
 811  0 for (Exception exception : exceptions) assertEquals(TimeoutException.class, exception.getClass());
 812    }
 813   
 814   
 815    /**
 816    * Conncurrent put on 2 different instances.
 817    */
 818  2 public void testConcurrentPutsOnTwoInstances() throws Exception
 819    {
 820  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 821  2 final CacheImpl c1 = this.cache1;
 822  2 final CacheImpl c2 = this.cache2;
 823   
 824  2 Thread t1 = new Thread()
 825    {
 826    Transaction tx;
 827   
 828  2 public void run()
 829    {
 830  2 try
 831    {
 832  2 tx = beginTransaction();
 833  2 c1.put("/ben/wang", "name", "Ben Wang");
 834  2 TestingUtil.sleepThread(8000);
 835  2 tx.commit();// This should go thru
 836    }
 837    catch (Throwable ex)
 838    {
 839  0 ex.printStackTrace();
 840  0 t1_ex = ex;
 841    }
 842    }
 843    };
 844   
 845  2 Thread t2 = new Thread()
 846    {
 847    Transaction tx;
 848   
 849  2 public void run()
 850    {
 851  2 try
 852    {
 853  2 TestingUtil.sleepThread(1000);// give Thread1 time to acquire the lock
 854  2 tx = beginTransaction();
 855  2 c2.put("/ben/wang", "name", "Ben Jr.");
 856  2 tx.commit();// This will time out and rollback first because Thread1 has a tx going as well.
 857    }
 858    catch (RollbackException rollback_ex)
 859    {
 860  2 System.out.println("received rollback exception as expected");
 861    }
 862    catch (Throwable ex)
 863    {
 864  0 ex.printStackTrace();
 865  0 t2_ex = ex;
 866    }
 867    }
 868    };
 869   
 870    // Let the game start
 871  2 t1.start();
 872  2 t2.start();
 873   
 874    // Wait for thread to die but put an insurance of 5 seconds on it.
 875  2 t1.join();
 876  2 t2.join();
 877   
 878  2 if (t1_ex != null)
 879    {
 880  0 fail("Thread1 failed: " + t1_ex);
 881    }
 882  2 if (t2_ex != null)
 883    {
 884  0 fail("Thread2 failed: " + t2_ex);
 885    }
 886  2 assertEquals("Ben Wang", c1.get("/ben/wang", "name"));
 887    }
 888   
 889   
 890  2 public void testPut() throws Exception
 891    {
 892  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 893  2 final CacheImpl c1 = this.cache1;
 894   
 895   
 896  2 Thread t1 = new Thread()
 897    {
 898  2 public void run()
 899    {
 900  2 try
 901    {
 902  2 lock.acquire();
 903  2 System.out.println("-- t1 has lock");
 904  2 c1.put("/a/b/c", "age", 38);
 905  2 System.out.println("[Thread1] set value to 38");
 906   
 907  2 System.out.println("-- t1 releases lock");
 908  2 lock.release();
 909  2 TestingUtil.sleepThread(300);
 910  2 Thread.yield();
 911   
 912  2 lock.acquire();
 913  2 System.out.println("-- t1 has lock");
 914  2 c1.put("/a/b/c", "age", 39);
 915  2 System.out.println("[Thread1] set value to 39");
 916   
 917  2 System.out.println("-- t1 releases lock");
 918  2 lock.release();
 919  2 assertEquals(39, c1.get("/a/b/c", "age"));
 920    }
 921    catch (Throwable ex)
 922    {
 923  0 ex.printStackTrace();
 924  0 t1_ex = ex;
 925    }
 926    finally
 927    {
 928  2 lock.release();
 929    }
 930    }
 931    };
 932   
 933  2 Thread t2 = new Thread()
 934    {
 935  2 public void run()
 936    {
 937  2 try
 938    {
 939  2 TestingUtil.sleepThread(100);
 940  2 Thread.yield();
 941  2 lock.acquire();
 942  2 System.out.println("-- t2 has lock");
 943    // Should replicate the value right away.
 944  2 Integer val = (Integer) cache2.get("/a/b/c", "age");
 945  2 System.out.println("[Thread2] value is " + val);
 946  2 assertEquals(new Integer(38), val);
 947  2 System.out.println("-- t2 releases lock");
 948  2 lock.release();
 949  2 TestingUtil.sleepThread(300);
 950  2 Thread.yield();
 951  2 TestingUtil.sleepThread(500);
 952  2 lock.acquire();
 953  2 System.out.println("-- t2 has lock");
 954  2 val = (Integer) cache2.get("/a/b/c", "age");
 955  2 System.out.println("-- t2 releases lock");
 956  2 lock.release();
 957  2 assertEquals(new Integer(39), val);
 958    }
 959    catch (Throwable ex)
 960    {
 961  0 ex.printStackTrace();
 962  0 t2_ex = ex;
 963    }
 964    finally
 965    {
 966  2 lock.release();
 967    }
 968    }
 969    };
 970   
 971    // Let the game start
 972  2 t1.start();
 973  2 t2.start();
 974   
 975    // Wait for thread to die but put an insurance of 5 seconds on it.
 976  2 t1.join();
 977  2 t2.join();
 978  2 if (t1_ex != null)
 979    {
 980  0 fail("Thread1 failed: " + t1_ex);
 981    }
 982  2 if (t2_ex != null)
 983    {
 984  0 fail("Thread2 failed: " + t2_ex);
 985    }
 986    }
 987   
 988    /**
 989    * Test replicated cache with transaction. Idea is to have two threads running
 990    * a local cache each that is replicating. Depending on whether cache1 commit/rollback or not,
 991    * the cache2.get will get different values.
 992    * Note that we have used sleep to interpose thread execution sequence.
 993    * Although it's not fool proof, it is rather simple and intuitive.
 994    *
 995    * @throws Exception
 996    */
 997  2 public void testPutTx() throws Exception
 998    {
 999  2 Transaction tx = null;
 1000   
 1001  2 try
 1002    {
 1003  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 1004  2 cache1.getConfiguration().setSyncCommitPhase(true);
 1005  2 cache2.getConfiguration().setSyncCommitPhase(true);
 1006  2 tx = beginTransaction();
 1007  2 cache1.put("/a/b/c", "age", 38);
 1008  2 cache1.put("/a/b/c", "age", 39);
 1009  2 Object val = cache2.get("/a/b/c", "age");// must be null as not yet committed
 1010  2 assertNull(val);
 1011  2 tx.commit();
 1012   
 1013  0 tx = beginTransaction();
 1014  0 assertEquals(39, cache2.get("/a/b/c", "age"));// must not be null
 1015  0 tx.commit();
 1016    }
 1017    catch (Throwable t)
 1018    {
 1019  2 t.printStackTrace();
 1020  2 t1_ex = t;
 1021    }
 1022    finally
 1023    {
 1024  2 lock.release();
 1025    }
 1026    }
 1027   
 1028   
 1029    /**
 1030    * Have both cache1 and cache2 do add and commit. cache1 commit should time out
 1031    * since it can't obtain the lock when trying to replicate cache2. On the other hand,
 1032    * cache2 commit will succeed since now that cache1 is rollbacked and lock is
 1033    * released.
 1034    */
 1035  2 public void testPutTx1() throws Exception
 1036    {
 1037  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 1038  2 final CacheImpl c1 = this.cache1;
 1039  2 Thread t1 = new Thread()
 1040    {
 1041  2 public void run()
 1042    {
 1043  2 Transaction tx = null;
 1044   
 1045  2 try
 1046    {
 1047  2 lock.acquire();
 1048  2 tx = beginTransaction();
 1049  2 c1.put("/a/b/c", "age", 38);
 1050  2 c1.put("/a/b/c", "age", 39);
 1051  2 lock.release();
 1052   
 1053  2 TestingUtil.sleepThread(300);
 1054  2 lock.acquire();
 1055  2 try
 1056    {
 1057  2 tx.commit();
 1058    }
 1059    catch (RollbackException ex)
 1060    {
 1061  2 System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
 1062    }
 1063    finally
 1064    {
 1065  2 lock.release();
 1066    }
 1067    }
 1068    catch (Throwable ex)
 1069    {
 1070  0 ex.printStackTrace();
 1071  0 t1_ex = ex;
 1072    }
 1073    finally
 1074    {
 1075  2 lock.release();
 1076    }
 1077    }
 1078    };
 1079   
 1080  2 Thread t2 = new Thread()
 1081    {
 1082  2 public void run()
 1083    {
 1084  2 Transaction tx = null;
 1085   
 1086  2 try
 1087    {
 1088  2 sleep(200);
 1089  2 Thread.yield();
 1090  2 lock.acquire();
 1091  2 tx = beginTransaction();
 1092  2 assertNull(cache2.get("/a/b/c", "age"));// must be null as not yet committed
 1093  2 cache2.put("/a/b/c", "age", 40);
 1094  2 lock.release();
 1095   
 1096  2 TestingUtil.sleepThread(300);
 1097  2 lock.acquire();
 1098  2 assertEquals(40, cache2.get("/a/b/c", "age"));// must not be null
 1099  2 tx.commit();
 1100  2 lock.release();
 1101   
 1102  2 TestingUtil.sleepThread(1000);
 1103  2 tx = beginTransaction();
 1104  2 assertEquals("After cache2 commit", 40, cache2.get("/a/b/c", "age"));
 1105  2 tx.commit();
 1106    }
 1107    catch (Throwable ex)
 1108    {
 1109  0 ex.printStackTrace();
 1110  0 t2_ex = ex;
 1111    }
 1112    finally
 1113    {
 1114  2 lock.release();
 1115    }
 1116    }
 1117    };
 1118   
 1119    // Let the game start
 1120  2 t1.start();
 1121  2 t2.start();
 1122   
 1123  2 t1.join();
 1124  2 t2.join();
 1125   
 1126  2 if (t1_ex != null)
 1127    {
 1128  0 fail("Thread1 failed: " + t1_ex);
 1129    }
 1130  2 if (t2_ex != null)
 1131    {
 1132  0 fail("Thread2 failed: " + t2_ex);
 1133    }
 1134    }
 1135   
 1136   
 1137  2 public void testPutTxWithRollback() throws Exception
 1138    {
 1139  2 initCaches(Configuration.CacheMode.REPL_SYNC);
 1140  2 final CacheImpl c2 = this.cache1;
 1141  2 Thread t1 = new Thread()
 1142    {
 1143  2 public void run()
 1144    {
 1145  2 Transaction tx = null;
 1146   
 1147  2 try
 1148    {
 1149  2 lock.acquire();
 1150  2 tx = beginTransaction();
 1151  2 c2.put("/a/b/c", "age", 38);
 1152  2 c2.put("/a/b/c", "age", 39);
 1153  2 lock.release();
 1154   
 1155  2 TestingUtil.sleepThread(100);
 1156  2 lock.acquire();
 1157  2 tx.rollback();
 1158  2 lock.release();
 1159    }
 1160    catch (Throwable ex)
 1161    {
 1162  0 ex.printStackTrace();
 1163  0 t1_ex = ex;
 1164    }
 1165    finally
 1166    {
 1167  2 lock.release();
 1168    }
 1169    }
 1170    };
 1171   
 1172  2 Thread t2 = new Thread()
 1173    {
 1174  2 public void run()
 1175    {
 1176  2 Transaction tx = null;
 1177   
 1178  2 try
 1179    {
 1180  2 sleep(200);
 1181  2 Thread.yield();
 1182  2 lock.acquire();
 1183  2 tx = beginTransaction();
 1184  2 assertNull(cache2.get("/a/b/c", "age"));// must be null as not yet committed
 1185  2 lock.release();
 1186   
 1187  2 TestingUtil.sleepThread(100);
 1188  2 lock.acquire();
 1189  2 assertNull(cache2.get("/a/b/c", "age"));// must be null as rolledback
 1190  2 tx.commit();
 1191  2 lock.release();
 1192    }
 1193    catch (Throwable ex)
 1194    {
 1195  0 ex.printStackTrace();
 1196  0 t2_ex = ex;
 1197    }
 1198    finally
 1199    {
 1200  2 lock.release();
 1201    }
 1202    }
 1203    };
 1204   
 1205    // Let the game start
 1206  2 t1.start();
 1207  2 t2.start();
 1208   
 1209    // Wait for thread to die but put an insurance of 5 seconds on it.
 1210  2 t1.join();
 1211  2 t2.join();
 1212  2 if (t1_ex != null)
 1213    {
 1214  0 fail("Thread1 failed: " + t1_ex);
 1215    }
 1216  2 if (t2_ex != null)
 1217    {
 1218  0 fail("Thread2 failed: " + t2_ex);
 1219    }
 1220    }
 1221   
 1222   
 1223    static class TransactionAborter implements Synchronization
 1224    {
 1225    Transaction ltx = null;
 1226   
 1227  6 public TransactionAborter(Transaction ltx)
 1228    {
 1229  6 this.ltx = ltx;
 1230    }
 1231   
 1232  6 public void beforeCompletion()
 1233    {
 1234  6 try
 1235    {
 1236  6 ltx.setRollbackOnly();
 1237    }
 1238    catch (SystemException e)
 1239    {
 1240    // who cares
 1241    }
 1242    }
 1243   
 1244  6 public void afterCompletion(int status)
 1245    {
 1246    }
 1247    }
 1248   
 1249    static class CallbackListener extends AbstractCacheListener
 1250    {
 1251   
 1252    CacheImpl callbackCache;
 1253    Object callbackKey;
 1254    Exception ex;
 1255    Object mutex = new Object();
 1256   
 1257  6 CallbackListener(CacheImpl cache, Object callbackKey)
 1258    {
 1259  6 this.callbackCache = cache;
 1260  6 this.callbackKey = callbackKey;
 1261  6 cache.getNotifier().addCacheListener(this);
 1262    }
 1263   
 1264  22 public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data)
 1265    {
 1266  22 if (!pre)
 1267    {
 1268    // Lock on a mutex so test can't check for an exception
 1269    // until the get call completes
 1270  12 synchronized (mutex)
 1271    {
 1272  12 try
 1273    {
 1274  12 callbackCache.get(fqn, callbackKey);
 1275    }
 1276    catch (CacheException e)
 1277    {
 1278  0 e.printStackTrace();
 1279  0 ex = e;
 1280    }
 1281    }
 1282    }
 1283    }
 1284   
 1285  8 Exception getCallbackException()
 1286    {
 1287  8 synchronized (mutex)
 1288    {
 1289  8 return ex;
 1290    }
 1291    }
 1292   
 1293    }
 1294   
 1295    static class TransactionAborterCallbackListener extends CallbackListener
 1296    {
 1297   
 1298    TransactionManager callbackTM;
 1299   
 1300  2 TransactionAborterCallbackListener(CacheImpl cache, Object callbackKey)
 1301    {
 1302  2 super(cache, callbackKey);
 1303  2 callbackTM = callbackCache.getTransactionManager();
 1304    }
 1305   
 1306  8 public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data)
 1307    {
 1308  8 if (!pre)
 1309    {
 1310  4 try
 1311    {
 1312  4 Transaction tx = callbackTM.getTransaction();
 1313  4 if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
 1314    {
 1315    // this will rollback the transaction
 1316  2 tx.registerSynchronization(new TransactionAborter(tx));
 1317    }
 1318    else
 1319    {
 1320  2 super.nodeModified(fqn, pre, isLocal, modType, data);
 1321    }
 1322   
 1323    }
 1324    catch (Exception e)
 1325    {
 1326  0 e.printStackTrace();
 1327  0 if (ex == null)
 1328    {
 1329  0 ex = e;
 1330    }
 1331    }
 1332    }
 1333    }
 1334   
 1335    }
 1336   
 1337    static class TransactionAborterListener extends AbstractCacheListener
 1338    {
 1339   
 1340    TransactionManager callbackTM;
 1341    Object mutex = new Object();
 1342    Exception ex;
 1343   
 1344  2 TransactionAborterListener(CacheImpl cache)
 1345    {
 1346  2 callbackTM = cache.getTransactionManager();
 1347  2 cache.getNotifier().addCacheListener(this);
 1348    }
 1349   
 1350  8 public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data)
 1351    {
 1352  8 if (!pre)
 1353    {
 1354  4 synchronized (mutex)
 1355    {
 1356  4 try
 1357    {
 1358  4 Transaction tx = callbackTM.getTransaction();
 1359  4 if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
 1360    {
 1361    // this will rollback the transaction
 1362  2 tx.setRollbackOnly();
 1363    }
 1364    }
 1365    catch (Exception e)
 1366    {
 1367  0 e.printStackTrace();
 1368  0 if (ex == null)
 1369    {
 1370  0 ex = e;
 1371    }
 1372    }
 1373    }
 1374    }
 1375    }
 1376   
 1377  2 Exception getCallbackException()
 1378    {
 1379  2 synchronized (mutex)
 1380    {
 1381  2 return ex;
 1382    }
 1383    }
 1384   
 1385    }
 1386   
 1387  2 public static Test suite()
 1388    {
 1389    // return getDeploySetup(SyncTxUnitTestCase.class, "cachetest.jar");
 1390  2 return new TestSuite(SyncReplTxTest.class);
 1391    }
 1392   
 1393   
 1394    }