Clover coverage report -
Coverage timestamp: Wed Jan 31 2007 15:38:53 EST
file stats: LOC: 602   Methods: 34
NCLOC: 425   Classes: 9
 
 Source file Conditionals Statements Methods TOTAL
DeadlockTest.java 65% 88.9% 85.3% 86.5%
coverage coverage
 1    /*
 2    *
 3    * JBoss, the OpenSource J2EE webOS
 4    *
 5    * Distributable under LGPL license.
 6    * See terms of license at gnu.org.
 7    */
 8   
 9    package org.jboss.cache.transaction;
 10   
 11    import junit.framework.Test;
 12    import junit.framework.TestCase;
 13    import junit.framework.TestSuite;
 14    import org.apache.commons.logging.Log;
 15    import org.apache.commons.logging.LogFactory;
 16    import org.jboss.cache.CacheException;
 17    import org.jboss.cache.CacheImpl;
 18    import org.jboss.cache.DefaultCacheFactory;
 19    import org.jboss.cache.Fqn;
 20    import org.jboss.cache.config.Configuration;
 21    import org.jboss.cache.lock.IsolationLevel;
 22    import org.jboss.cache.lock.TimeoutException;
 23    import org.jboss.cache.lock.UpgradeException;
 24    import org.jboss.cache.misc.TestingUtil;
 25   
 26    import javax.transaction.NotSupportedException;
 27    import javax.transaction.SystemException;
 28    import javax.transaction.Transaction;
 29   
 30    /**
 31    * Tests transactional access to a local CacheImpl, with concurrent (deadlock-prone) access.
 32    * Note: we use DummpyTranasctionManager to replace jta
 33    *
 34    * @version $Id: DeadlockTest.java,v 1.9 2007/01/11 13:49:06 msurtani Exp $
 35    */
 36    public class DeadlockTest extends TestCase
 37    {
 38    CacheImpl cache = null;
 39    Exception thread_ex;
 40   
 41    final Fqn NODE = Fqn.fromString("/a/b/c");
 42    final Fqn PARENT_NODE = Fqn.fromString("/a/b");
 43    final Fqn FQN1 = NODE;
 44    final Fqn FQN2 = Fqn.fromString("/1/2/3");
 45    final Log log = LogFactory.getLog(DeadlockTest.class);
 46   
 47   
 48  6 public DeadlockTest(String name)
 49    {
 50  6 super(name);
 51    }
 52   
 53  6 public void setUp() throws Exception
 54    {
 55  6 super.setUp();
 56  6 DummyTransactionManager.getInstance();
 57  6 cache = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 58  6 cache.getConfiguration().setInitialStateRetrievalTimeout(10000);
 59  6 cache.getConfiguration().setClusterName("test");
 60  6 cache.getConfiguration().setCacheMode(Configuration.CacheMode.LOCAL);
 61  6 cache.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
 62  6 cache.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
 63  6 cache.getConfiguration().setLockAcquisitionTimeout(3000);
 64  6 cache.create();
 65  6 cache.start();
 66  6 thread_ex = null;
 67    }
 68   
 69   
 70  6 public void tearDown() throws Exception
 71    {
 72  6 super.tearDown();
 73  6 if (cache != null)
 74    {
 75  6 cache.stop();
 76    }
 77  6 if (thread_ex != null)
 78    {
 79  0 throw thread_ex;
 80    }
 81    }
 82   
 83   
 84  1 public void testConcurrentUpgrade() throws CacheException, InterruptedException
 85    {
 86  1 MyThread t1 = new MyThreadTimeout("MyThread#1", NODE);
 87  1 MyThread t2 = new MyThread("MyThread#2", NODE);
 88   
 89  1 cache.put(NODE, null);
 90   
 91  1 t1.start();
 92  1 t2.start();
 93   
 94  1 TestingUtil.sleepThread((long) 5000);
 95   
 96  1 synchronized (t1)
 97    {
 98  1 t1.notify();// t1 will now try to upgrade RL to WL, but fails b/c t2 still has a RL
 99    }
 100   
 101  1 TestingUtil.sleepThread((long) 5000);
 102   
 103  1 synchronized (t2)
 104    {
 105  1 t2.notify();// t1 should now be able to upgrade because t1 was rolled back (RL was removed)
 106    }
 107   
 108  1 t1.join();
 109  1 t2.join();
 110    }
 111   
 112   
 113    /**
 114    * Typical deadlock: t1 acquires WL on /a/b/c, t2 WL on /1/2/3, then t1 attempts to get WL on /1/2/3 (locked by t2),
 115    * and t2 tries to acquire WL on /a/b/c. One (or both) of the 2 transactions is going to timeout and roll back.
 116    */
 117  1 public void testPutDeadlock() throws CacheException, InterruptedException
 118    {
 119  1 MyPutter t1 = new MyPutterTimeout("MyPutter#1", FQN1, FQN2);
 120  1 MyPutter t2 = new MyPutter("MyPutter#2", FQN2, FQN1);
 121   
 122  1 cache.put(FQN1, null);
 123  1 cache.put(FQN2, null);
 124   
 125  1 t1.start();
 126  1 t2.start();
 127   
 128  1 TestingUtil.sleepThread((long) 1000);
 129   
 130  1 synchronized (t1)
 131    {
 132  1 t1.notify();// t1 will now try to acquire WL on /1/2/3 (held by t2) - this will time out
 133    }
 134   
 135  1 TestingUtil.sleepThread((long) 1000);
 136   
 137  1 synchronized (t2)
 138    {
 139  1 t2.notify();// t2 tries to acquire WL on /a/b/c (held by t1)
 140    }
 141   
 142  1 t1.join();
 143  1 t2.join();
 144    }
 145   
 146    /*
 147   
 148    Commented out since JBCACHE-875 onwards, this will end up in a classic deadlock since we lock parent when removing a child.
 149   
 150    public void testCreateIfNotExistsLogic() throws CacheException, InterruptedException
 151    {
 152    cache.put(NODE, null);
 153   
 154    class T0 extends GenericThread
 155    {
 156    public T0(String name)
 157    {
 158    super(name);
 159    }
 160   
 161    protected void _run() throws Exception
 162    {
 163    Transaction myTx = startTransaction();
 164    log("put(" + NODE + ")");
 165    cache.put(NODE, null);
 166    log("put(" + NODE + "): OK");
 167   
 168    synchronized (this) {wait();}
 169   
 170    log("remove(" + NODE + ")");
 171    cache.remove(NODE);
 172    log("remove(" + NODE + "): OK");
 173   
 174    log("committing TX");
 175    myTx.commit();
 176    }
 177    }
 178   
 179    class T1 extends GenericThread
 180    {
 181    public T1(String name)
 182    {
 183    super(name);
 184    }
 185   
 186    protected void _run() throws Exception
 187    {
 188    Transaction myTx = startTransaction();
 189    log("put(" + NODE + ")");
 190    cache.put(NODE, null);
 191    log("put(" + NODE + "): OK");
 192   
 193    log("committing TX");
 194    myTx.commit();
 195    }
 196   
 197    }
 198   
 199    T0 t0 = new T0("T0");
 200    t0.start();
 201    TestingUtil.sleepThread((long) 500);
 202    T1 t1 = new T1("T1");
 203    t1.start();
 204    TestingUtil.sleepThread((long) 500);
 205    synchronized (t0)
 206    {
 207    t0.notify();
 208    }
 209    t0.join();
 210    t1.join();
 211    }
 212   
 213    */
 214   
 215  1 public void testMoreThanOneUpgrader() throws Exception
 216    {
 217  1 final int NUM = 2;
 218  1 final Object lock = new Object();
 219   
 220  1 cache.put(NODE, "bla", "blo");
 221   
 222  1 MyUpgrader[] upgraders = new MyUpgrader[NUM];
 223  1 for (int i = 0; i < upgraders.length; i++)
 224    {
 225  2 upgraders[i] = new MyUpgrader("Upgrader#" + i, NODE, lock);
 226  2 upgraders[i].start();
 227    }
 228   
 229  1 TestingUtil.sleepThread((long) 1000);
 230  1 log("locks: " + cache.printLockInfo());
 231   
 232  1 synchronized (lock)
 233    {
 234  1 lock.notifyAll();
 235    }
 236   
 237    // all threads now try to upgrade the RL to a WL
 238  1 for (int i = 0; i < upgraders.length; i++)
 239    {
 240  2 MyThread upgrader = upgraders[i];
 241  2 upgrader.join();
 242    }
 243    }
 244   
 245   
 246  1 public void testPutsAndRemovesOnParentAndChildNodes() throws InterruptedException
 247    {
 248  1 ContinuousPutter putter = new ContinuousPutter("Putter", NODE);
 249  1 ContinuousRemover remover = new ContinuousRemover("Remover", PARENT_NODE);
 250  1 putter.start();
 251  1 remover.start();
 252  1 TestingUtil.sleepThread((long) 5000);
 253  1 log("stopping Putter");
 254  1 putter.looping = false;
 255  1 log("stopping Remover");
 256  1 remover.looping = false;
 257  1 putter.join();
 258  1 remover.join();
 259    }
 260   
 261  1 public void testPutsAndRemovesOnParentAndChildNodesReversed() throws InterruptedException
 262    {
 263  1 ContinuousPutter putter = new ContinuousPutter("Putter", PARENT_NODE);
 264  1 ContinuousRemover remover = new ContinuousRemover("Remover", NODE);
 265  1 putter.start();
 266  1 remover.start();
 267  1 TestingUtil.sleepThread((long) 5000);
 268  1 log("stopping Putter");
 269  1 putter.looping = false;
 270  1 log("stopping Remover");
 271  1 remover.looping = false;
 272  1 putter.join();
 273  1 remover.join();
 274    }
 275   
 276  1 public void testPutsAndRemovesOnSameNode() throws InterruptedException
 277    {
 278  1 ContinuousPutter putter = new ContinuousPutter("Putter", NODE);
 279  1 ContinuousRemover remover = new ContinuousRemover("Remover", NODE);
 280  1 putter.start();
 281  1 remover.start();
 282  1 TestingUtil.sleepThread((long) 5000);
 283  1 log("stopping Putter");
 284  1 putter.looping = false;
 285  1 log("stopping Remover");
 286  1 remover.looping = false;
 287  1 putter.join();
 288  1 remover.join();
 289    }
 290   
 291   
 292    class GenericThread extends Thread
 293    {
 294    protected Transaction tx;
 295    protected boolean looping = true;
 296   
 297  0 public GenericThread()
 298    {
 299   
 300    }
 301   
 302  12 public GenericThread(String name)
 303    {
 304  12 super(name);
 305    }
 306   
 307  0 public void setLooping(boolean looping)
 308    {
 309  0 this.looping = looping;
 310    }
 311   
 312  12 public void run()
 313    {
 314  12 try
 315    {
 316  12 _run();
 317    }
 318    catch (Exception t)
 319    {
 320  0 System.out.println(getName() + ": " + t);
 321  0 if (thread_ex == null)
 322    {
 323  0 thread_ex = t;
 324    }
 325    }
 326  12 if (log.isTraceEnabled())
 327    {
 328  0 log.trace("Thread " + getName() + " terminated");
 329    }
 330    }
 331   
 332  0 protected void _run() throws Exception
 333    {
 334  0 throw new UnsupportedOperationException();
 335    }
 336    }
 337   
 338   
 339    class ContinuousRemover extends GenericThread
 340    {
 341    Fqn fqn;
 342   
 343  3 public ContinuousRemover(String name, Fqn fqn)
 344    {
 345  3 super(name);
 346  3 this.fqn = fqn;
 347    }
 348   
 349   
 350  3 protected void _run() throws Exception
 351    {
 352  3 while (thread_ex == null && looping)
 353    {
 354  285 try
 355    {
 356  285 if (interrupted())
 357    {
 358  0 break;
 359    }
 360  285 tx = startTransaction();
 361  285 log("remove(" + fqn + ")");
 362  285 cache.remove(fqn);
 363  285 sleep(random(20));
 364  285 tx.commit();
 365    }
 366    catch (InterruptedException interrupted)
 367    {
 368  0 tx.rollback();
 369  0 break;
 370    }
 371    catch (Exception ex)
 372    {
 373  0 tx.rollback();
 374  0 throw ex;
 375    }
 376    }
 377    }
 378    }
 379   
 380    class ContinuousPutter extends GenericThread
 381    {
 382    Fqn fqn;
 383   
 384  3 public ContinuousPutter(String name, Fqn fqn)
 385    {
 386  3 super(name);
 387  3 this.fqn = fqn;
 388    }
 389   
 390   
 391  3 protected void _run() throws Exception
 392    {
 393  3 while (thread_ex == null && looping)
 394    {
 395  787 try
 396    {
 397  787 if (interrupted())
 398    {
 399  0 break;
 400    }
 401  787 tx = startTransaction();
 402  787 log("put(" + fqn + ")");
 403  787 cache.put(fqn, "foo", "bar");
 404  787 sleep(random(20));
 405  787 tx.commit();
 406    }
 407    catch (InterruptedException interrupted)
 408    {
 409  0 tx.rollback();
 410  0 break;
 411    }
 412    catch (Exception ex)
 413    {
 414  0 tx.rollback();
 415  0 throw ex;
 416    }
 417    }
 418    }
 419    }
 420   
 421  1072 public static long random(long range)
 422    {
 423  1072 return (long) ((Math.random() * 100000) % range) + 1;
 424    }
 425   
 426   
 427    class MyThread extends GenericThread
 428    {
 429    Fqn fqn;
 430   
 431   
 432  4 public MyThread(String name, Fqn fqn)
 433    {
 434  4 super(name);
 435  4 this.fqn = fqn;
 436    }
 437   
 438  2 protected void _run() throws Exception
 439    {
 440  2 tx = startTransaction();
 441  2 log("get(" + fqn + ")");
 442  2 cache.get(fqn, "bla");// acquires RL
 443  2 log("done, locks: " + cache.printLockInfo());
 444   
 445  2 synchronized (this) {wait();}
 446   
 447  2 log("put(" + fqn + ")");
 448  2 cache.put(fqn, "key", "val");// need to upgrade RL to WL
 449  1 log("done, locks: " + cache.printLockInfo());
 450  1 tx.commit();
 451  1 log("committed TX, locks: " + cache.printLockInfo());
 452    }
 453    }
 454   
 455   
 456    class MyUpgrader extends MyThread
 457    {
 458    Object lock;
 459   
 460  0 public MyUpgrader(String name, Fqn fqn)
 461    {
 462  0 super(name, fqn);
 463    }
 464   
 465  2 public MyUpgrader(String name, Fqn fqn, Object lock)
 466    {
 467  2 super(name, fqn);
 468  2 this.lock = lock;
 469    }
 470   
 471  2 protected void _run() throws Exception
 472    {
 473  2 tx = startTransaction();
 474  2 try
 475    {
 476  2 log("get(" + fqn + ")");
 477   
 478  2 cache.get(fqn, "bla");// acquires RL
 479   
 480  2 synchronized (lock) {lock.wait();}
 481   
 482  2 log("put(" + fqn + ")");
 483  2 cache.put(fqn, "key", "val");// need to upgrade RL to WL
 484  1 log("done, locks: " + cache.printLockInfo());
 485  1 tx.commit();
 486  1 log("committed TX, locks: " + cache.printLockInfo());
 487    }
 488    catch (UpgradeException upge)
 489    {
 490  1 log("Exception upgrading lock");
 491  1 tx.rollback();
 492    }
 493    }
 494    }
 495   
 496    class MyThreadTimeout extends MyThread
 497    {
 498   
 499  1 public MyThreadTimeout(String name, Fqn fqn)
 500    {
 501  1 super(name, fqn);
 502    }
 503   
 504  1 protected void _run() throws Exception
 505    {
 506  1 try
 507    {
 508  1 super._run();
 509    }
 510    catch (UpgradeException upgradeEx)
 511    {
 512  1 log("received UpgradeException as expected");
 513  1 tx.rollback();
 514  1 log("rolled back TX, locks: " + cache.printLockInfo());
 515    }
 516    catch (TimeoutException timeoutEx)
 517    {
 518  0 log("received TimeoutException as expected");
 519  0 tx.rollback();
 520  0 log("rolled back TX, locks: " + cache.printLockInfo());
 521    }
 522    }
 523    }
 524   
 525   
 526    class MyPutter extends GenericThread
 527    {
 528    Fqn fqn1, fqn2;
 529   
 530  2 public MyPutter(String name, Fqn fqn1, Fqn fqn2)
 531    {
 532  2 super(name);
 533  2 this.fqn1 = fqn1;
 534  2 this.fqn2 = fqn2;
 535    }
 536   
 537  2 protected void _run() throws Exception
 538    {
 539  2 tx = startTransaction();
 540  2 log("put(" + fqn1 + ")");
 541  2 cache.put(fqn1, "key", "val");// need to upgrade RL to WL
 542  2 log("done, locks: " + cache.printLockInfo());
 543  2 synchronized (this) {wait();}
 544  2 log("put(" + fqn2 + ")");
 545  2 cache.put(fqn2, "key", "val");// need to upgrade RL to WL
 546  1 log("done, locks: " + cache.printLockInfo());
 547  1 tx.commit();
 548  1 log("committed TX, locks: " + cache.printLockInfo());
 549    }
 550    }
 551   
 552    class MyPutterTimeout extends MyPutter
 553    {
 554   
 555  1 public MyPutterTimeout(String name, Fqn fqn1, Fqn fqn2)
 556    {
 557  1 super(name, fqn1, fqn2);
 558    }
 559   
 560  1 protected void _run() throws Exception
 561    {
 562  1 try
 563    {
 564  1 super._run();
 565    }
 566    catch (TimeoutException timeoutEx)
 567    {
 568  1 log("received TimeoutException as expected");
 569  1 tx.rollback();
 570  1 log("rolled back TX, locks: " + cache.printLockInfo());
 571    }
 572    }
 573    }
 574   
 575   
 576  1106 private static void log(String msg)
 577    {
 578  1106 System.out.println(Thread.currentThread().getName() + ": " + msg);
 579    }
 580   
 581   
 582  1078 Transaction startTransaction() throws SystemException, NotSupportedException
 583    {
 584  1078 DummyTransactionManager mgr = DummyTransactionManager.getInstance();
 585  1078 mgr.begin();
 586  1078 Transaction tx = mgr.getTransaction();
 587  1078 return tx;
 588    }
 589   
 590   
 591  1 public static Test suite() throws Exception
 592    {
 593  1 return new TestSuite(DeadlockTest.class);
 594    }
 595   
 596  0 public static void main(String[] args) throws Exception
 597    {
 598  0 junit.textui.TestRunner.run(suite());
 599    }
 600   
 601   
 602    }