Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 611   Methods: 34
NCLOC: 434   Classes: 9
 
 Source file Conditionals Statements Methods TOTAL
DeadlockTest.java 61.1% 88.9% 85.3% 86.4%
coverage coverage
 1    /*
 2    *
 3    * JBoss, the OpenSource J2EE webOS
 4    *
 5    * Distributable under LGPL license.
 6    * See terms of license at gnu.org.
 7    */
 8   
 9    package org.jboss.cache.transaction;
 10   
 11    import junit.framework.Test;
 12    import junit.framework.TestCase;
 13    import junit.framework.TestSuite;
 14    import org.apache.commons.logging.Log;
 15    import org.apache.commons.logging.LogFactory;
 16    import org.jboss.cache.CacheException;
 17    import org.jboss.cache.CacheImpl;
 18    import org.jboss.cache.DefaultCacheFactory;
 19    import org.jboss.cache.Fqn;
 20    import org.jboss.cache.config.Configuration;
 21    import org.jboss.cache.lock.IsolationLevel;
 22    import org.jboss.cache.lock.TimeoutException;
 23    import org.jboss.cache.lock.UpgradeException;
 24    import org.jboss.cache.misc.TestingUtil;
 25   
 26    import javax.transaction.NotSupportedException;
 27    import javax.transaction.SystemException;
 28    import javax.transaction.Transaction;
 29   
 30    /**
 31    * Tests transactional access to a local CacheImpl, with concurrent (deadlock-prone) access.
 32    * Note: we use DummpyTranasctionManager to replace jta
 33    *
 34    * @version $Id: DeadlockTest.java,v 1.13 2007/06/14 15:30:17 msurtani Exp $
 35    */
 36    public class DeadlockTest extends TestCase
 37    {
 38    CacheImpl cache = null;
 39    Exception thread_ex;
 40   
 41    final Fqn NODE = Fqn.fromString("/a/b/c");
 42    final Fqn PARENT_NODE = Fqn.fromString("/a/b");
 43    final Fqn FQN1 = NODE;
 44    final Fqn FQN2 = Fqn.fromString("/1/2/3");
 45    final Log log = LogFactory.getLog(DeadlockTest.class);
 46   
 47   
 48  6 public DeadlockTest(String name)
 49    {
 50  6 super(name);
 51    }
 52   
 53  6 public void setUp() throws Exception
 54    {
 55  6 super.setUp();
 56  6 DummyTransactionManager.getInstance();
 57  6 cache = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
 58  6 cache.getConfiguration().setStateRetrievalTimeout(10000);
 59  6 cache.getConfiguration().setClusterName("test");
 60  6 cache.getConfiguration().setCacheMode(Configuration.CacheMode.LOCAL);
 61  6 cache.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
 62  6 cache.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
 63  6 cache.getConfiguration().setLockParentForChildInsertRemove(true);
 64  6 cache.getConfiguration().setLockAcquisitionTimeout(3000);
 65  6 cache.create();
 66  6 cache.start();
 67  6 thread_ex = null;
 68    }
 69   
 70   
 71  6 public void tearDown() throws Exception
 72    {
 73  6 super.tearDown();
 74  6 if (cache != null)
 75    {
 76  6 cache.stop();
 77    }
 78  6 if (thread_ex != null)
 79    {
 80  0 throw thread_ex;
 81    }
 82    }
 83   
 84   
 85  1 public void testConcurrentUpgrade() throws CacheException, InterruptedException
 86    {
 87  1 MyThread t1 = new MyThreadTimeout("MyThread#1", NODE);
 88  1 MyThread t2 = new MyThread("MyThread#2", NODE);
 89   
 90  1 cache.put(NODE, null);
 91   
 92  1 t1.start();
 93  1 t2.start();
 94   
 95  1 TestingUtil.sleepThread((long) 5000);
 96   
 97  1 synchronized (t1)
 98    {
 99  1 t1.notify();// t1 will now try to upgrade RL to WL, but fails b/c t2 still has a RL
 100    }
 101   
 102  1 TestingUtil.sleepThread((long) 5000);
 103   
 104  1 synchronized (t2)
 105    {
 106  1 t2.notify();// t1 should now be able to upgrade because t1 was rolled back (RL was removed)
 107    }
 108   
 109  1 t1.join();
 110  1 t2.join();
 111    }
 112   
 113   
 114    /**
 115    * Typical deadlock: t1 acquires WL on /a/b/c, t2 WL on /1/2/3, then t1 attempts to get WL on /1/2/3 (locked by t2),
 116    * and t2 tries to acquire WL on /a/b/c. One (or both) of the 2 transactions is going to timeout and roll back.
 117    */
 118  1 public void testPutDeadlock() throws CacheException, InterruptedException
 119    {
 120  1 MyPutter t1 = new MyPutterTimeout("MyPutter#1", FQN1, FQN2);
 121  1 MyPutter t2 = new MyPutter("MyPutter#2", FQN2, FQN1);
 122   
 123  1 cache.put(FQN1, null);
 124  1 cache.put(FQN2, null);
 125   
 126  1 t1.start();
 127  1 t2.start();
 128   
 129  1 TestingUtil.sleepThread((long) 1000);
 130   
 131  1 synchronized (t1)
 132    {
 133  1 t1.notify();// t1 will now try to acquire WL on /1/2/3 (held by t2) - this will time out
 134    }
 135   
 136  1 TestingUtil.sleepThread((long) 1000);
 137   
 138  1 synchronized (t2)
 139    {
 140  1 t2.notify();// t2 tries to acquire WL on /a/b/c (held by t1)
 141    }
 142   
 143  1 t1.join();
 144  1 t2.join();
 145    }
 146   
 147    /*
 148   
 149    Commented out since JBCACHE-875 onwards, this will end up in a classic deadlock since we lock parent when removing a child.
 150   
 151    public void testCreateIfNotExistsLogic() throws CacheException, InterruptedException
 152    {
 153    cache.put(NODE, null);
 154   
 155    class T0 extends GenericThread
 156    {
 157    public T0(String name)
 158    {
 159    super(name);
 160    }
 161   
 162    protected void _run() throws Exception
 163    {
 164    Transaction myTx = startTransaction();
 165    log("put(" + NODE + ")");
 166    cache.put(NODE, null);
 167    log("put(" + NODE + "): OK");
 168   
 169    synchronized (this) {wait();}
 170   
 171    log("remove(" + NODE + ")");
 172    cache.remove(NODE);
 173    log("remove(" + NODE + "): OK");
 174   
 175    log("committing TX");
 176    myTx.commit();
 177    }
 178    }
 179   
 180    class T1 extends GenericThread
 181    {
 182    public T1(String name)
 183    {
 184    super(name);
 185    }
 186   
 187    protected void _run() throws Exception
 188    {
 189    Transaction myTx = startTransaction();
 190    log("put(" + NODE + ")");
 191    cache.put(NODE, null);
 192    log("put(" + NODE + "): OK");
 193   
 194    log("committing TX");
 195    myTx.commit();
 196    }
 197   
 198    }
 199   
 200    T0 t0 = new T0("T0");
 201    t0.start();
 202    TestingUtil.sleepThread((long) 500);
 203    T1 t1 = new T1("T1");
 204    t1.start();
 205    TestingUtil.sleepThread((long) 500);
 206    synchronized (t0)
 207    {
 208    t0.notify();
 209    }
 210    t0.join();
 211    t1.join();
 212    }
 213   
 214    */
 215   
 216  1 public void testMoreThanOneUpgrader() throws Exception
 217    {
 218  1 final int NUM = 2;
 219  1 final Object lock = new Object();
 220   
 221  1 cache.put(NODE, "bla", "blo");
 222   
 223  1 MyUpgrader[] upgraders = new MyUpgrader[NUM];
 224  1 for (int i = 0; i < upgraders.length; i++)
 225    {
 226  2 upgraders[i] = new MyUpgrader("Upgrader#" + i, NODE, lock);
 227  2 upgraders[i].start();
 228    }
 229   
 230  1 TestingUtil.sleepThread((long) 1000);
 231  1 log("locks: " + cache.printLockInfo());
 232   
 233  1 synchronized (lock)
 234    {
 235  1 lock.notifyAll();
 236    }
 237   
 238    // all threads now try to upgrade the RL to a WL
 239  1 for (MyUpgrader upgrader : upgraders)
 240    {
 241  2 upgrader.join();
 242    }
 243    }
 244   
 245   
 246  1 public void testPutsAndRemovesOnParentAndChildNodes() throws InterruptedException
 247    {
 248  1 ContinuousPutter putter = new ContinuousPutter("Putter", NODE);
 249  1 ContinuousRemover remover = new ContinuousRemover("Remover", PARENT_NODE);
 250  1 putter.start();
 251  1 remover.start();
 252  1 TestingUtil.sleepThread((long) 5000);
 253  1 log("stopping Putter");
 254  1 putter.looping = false;
 255  1 log("stopping Remover");
 256  1 remover.looping = false;
 257  1 putter.join();
 258  1 remover.join();
 259    }
 260   
 261  1 public void testPutsAndRemovesOnParentAndChildNodesReversed() throws InterruptedException
 262    {
 263  1 ContinuousPutter putter = new ContinuousPutter("Putter", PARENT_NODE);
 264  1 ContinuousRemover remover = new ContinuousRemover("Remover", NODE);
 265  1 putter.start();
 266  1 remover.start();
 267  1 TestingUtil.sleepThread((long) 5000);
 268  1 log("stopping Putter");
 269  1 putter.looping = false;
 270  1 log("stopping Remover");
 271  1 remover.looping = false;
 272  1 putter.join();
 273  1 remover.join();
 274    }
 275   
 276  1 public void testPutsAndRemovesOnSameNode() throws InterruptedException
 277    {
 278  1 ContinuousPutter putter = new ContinuousPutter("Putter", NODE);
 279  1 ContinuousRemover remover = new ContinuousRemover("Remover", NODE);
 280  1 putter.start();
 281  1 remover.start();
 282  1 TestingUtil.sleepThread((long) 5000);
 283  1 log("stopping Putter");
 284  1 putter.looping = false;
 285  1 log("stopping Remover");
 286  1 remover.looping = false;
 287  1 putter.join();
 288  1 remover.join();
 289    }
 290   
 291   
 292    class GenericThread extends Thread
 293    {
 294    protected Transaction tx;
 295    protected boolean looping = true;
 296   
 297  0 public GenericThread()
 298    {
 299   
 300    }
 301   
 302  12 public GenericThread(String name)
 303    {
 304  12 super(name);
 305    }
 306   
 307  0 public void setLooping(boolean looping)
 308    {
 309  0 this.looping = looping;
 310    }
 311   
 312  12 public void run()
 313    {
 314  12 try
 315    {
 316  12 _run();
 317    }
 318    catch (Exception t)
 319    {
 320  0 System.out.println(getName() + ": " + t);
 321  0 if (thread_ex == null)
 322    {
 323  0 thread_ex = t;
 324    }
 325    }
 326  12 if (log.isTraceEnabled())
 327    {
 328  0 log.trace("Thread " + getName() + " terminated");
 329    }
 330    }
 331   
 332  0 protected void _run() throws Exception
 333    {
 334  0 throw new UnsupportedOperationException();
 335    }
 336    }
 337   
 338   
 339    class ContinuousRemover extends GenericThread
 340    {
 341    Fqn fqn;
 342   
 343  3 public ContinuousRemover(String name, Fqn fqn)
 344    {
 345  3 super(name);
 346  3 this.fqn = fqn;
 347    }
 348   
 349   
 350  3 protected void _run() throws Exception
 351    {
 352  3 while (thread_ex == null && looping)
 353    {
 354  735 try
 355    {
 356  735 if (interrupted())
 357    {
 358  0 break;
 359    }
 360  735 tx = startTransaction();
 361  735 log("remove(" + fqn + ")");
 362  735 cache.remove(fqn);
 363  735 sleep(random(20));
 364  735 tx.commit();
 365    }
 366    catch (InterruptedException interrupted)
 367    {
 368  0 tx.rollback();
 369  0 break;
 370    }
 371    catch (Exception ex)
 372    {
 373  0 tx.rollback();
 374  0 throw ex;
 375    }
 376    }
 377    }
 378    }
 379   
 380    class ContinuousPutter extends GenericThread
 381    {
 382    Fqn fqn;
 383   
 384  3 public ContinuousPutter(String name, Fqn fqn)
 385    {
 386  3 super(name);
 387  3 this.fqn = fqn;
 388    }
 389   
 390   
 391  3 protected void _run() throws Exception
 392    {
 393  3 while (thread_ex == null && looping)
 394    {
 395  173 try
 396    {
 397  173 if (interrupted())
 398    {
 399  0 break;
 400    }
 401  173 tx = startTransaction();
 402  173 log("put(" + fqn + ")");
 403  173 cache.put(fqn, "foo", "bar");
 404  173 sleep(random(20));
 405  173 tx.commit();
 406    }
 407    catch (InterruptedException interrupted)
 408    {
 409  0 tx.rollback();
 410  0 break;
 411    }
 412    catch (Exception ex)
 413    {
 414  0 tx.rollback();
 415  0 throw ex;
 416    }
 417    }
 418    }
 419    }
 420   
 421  908 public static long random(long range)
 422    {
 423  908 return (long) ((Math.random() * 100000) % range) + 1;
 424    }
 425   
 426   
 427    class MyThread extends GenericThread
 428    {
 429    Fqn fqn;
 430   
 431   
 432  4 public MyThread(String name, Fqn fqn)
 433    {
 434  4 super(name);
 435  4 this.fqn = fqn;
 436    }
 437   
 438  2 protected void _run() throws Exception
 439    {
 440  2 tx = startTransaction();
 441  2 log("get(" + fqn + ")");
 442  2 cache.get(fqn, "bla");// acquires RL
 443  2 log("done, locks: " + cache.printLockInfo());
 444   
 445  2 synchronized (this)
 446    {
 447  2 wait();
 448    }
 449   
 450  2 log("put(" + fqn + ")");
 451  2 cache.put(fqn, "key", "val");// need to upgrade RL to WL
 452  1 log("done, locks: " + cache.printLockInfo());
 453  1 tx.commit();
 454  1 log("committed TX, locks: " + cache.printLockInfo());
 455    }
 456    }
 457   
 458   
 459    class MyUpgrader extends MyThread
 460    {
 461    Object lock;
 462   
 463  0 public MyUpgrader(String name, Fqn fqn)
 464    {
 465  0 super(name, fqn);
 466    }
 467   
 468  2 public MyUpgrader(String name, Fqn fqn, Object lock)
 469    {
 470  2 super(name, fqn);
 471  2 this.lock = lock;
 472    }
 473   
 474  2 protected void _run() throws Exception
 475    {
 476  2 tx = startTransaction();
 477  2 try
 478    {
 479  2 log("get(" + fqn + ")");
 480   
 481  2 cache.get(fqn, "bla");// acquires RL
 482   
 483  2 synchronized (lock)
 484    {
 485  2 lock.wait();
 486    }
 487   
 488  2 log("put(" + fqn + ")");
 489  2 cache.put(fqn, "key", "val");// need to upgrade RL to WL
 490  1 log("done, locks: " + cache.printLockInfo());
 491  1 tx.commit();
 492  1 log("committed TX, locks: " + cache.printLockInfo());
 493    }
 494    catch (UpgradeException upge)
 495    {
 496  1 log("Exception upgrading lock");
 497  1 tx.rollback();
 498    }
 499    }
 500    }
 501   
 502    class MyThreadTimeout extends MyThread
 503    {
 504   
 505  1 public MyThreadTimeout(String name, Fqn fqn)
 506    {
 507  1 super(name, fqn);
 508    }
 509   
 510  1 protected void _run() throws Exception
 511    {
 512  1 try
 513    {
 514  1 super._run();
 515    }
 516    catch (UpgradeException upgradeEx)
 517    {
 518  1 log("received UpgradeException as expected");
 519  1 tx.rollback();
 520  1 log("rolled back TX, locks: " + cache.printLockInfo());
 521    }
 522    catch (TimeoutException timeoutEx)
 523    {
 524  0 log("received TimeoutException as expected");
 525  0 tx.rollback();
 526  0 log("rolled back TX, locks: " + cache.printLockInfo());
 527    }
 528    }
 529    }
 530   
 531   
 532    class MyPutter extends GenericThread
 533    {
 534    Fqn fqn1, fqn2;
 535   
 536  2 public MyPutter(String name, Fqn fqn1, Fqn fqn2)
 537    {
 538  2 super(name);
 539  2 this.fqn1 = fqn1;
 540  2 this.fqn2 = fqn2;
 541    }
 542   
 543  2 protected void _run() throws Exception
 544    {
 545  2 tx = startTransaction();
 546  2 log("put(" + fqn1 + ")");
 547  2 cache.put(fqn1, "key", "val");// need to upgrade RL to WL
 548  2 log("done, locks: " + cache.printLockInfo());
 549  2 synchronized (this)
 550    {
 551  2 wait();
 552    }
 553  2 log("put(" + fqn2 + ")");
 554  2 cache.put(fqn2, "key", "val");// need to upgrade RL to WL
 555  1 log("done, locks: " + cache.printLockInfo());
 556  1 tx.commit();
 557  1 log("committed TX, locks: " + cache.printLockInfo());
 558    }
 559    }
 560   
 561    class MyPutterTimeout extends MyPutter
 562    {
 563   
 564  1 public MyPutterTimeout(String name, Fqn fqn1, Fqn fqn2)
 565    {
 566  1 super(name, fqn1, fqn2);
 567    }
 568   
 569  1 protected void _run() throws Exception
 570    {
 571  1 try
 572    {
 573  1 super._run();
 574    }
 575    catch (TimeoutException timeoutEx)
 576    {
 577  1 log("received TimeoutException as expected");
 578  1 tx.rollback();
 579  1 log("rolled back TX, locks: " + cache.printLockInfo());
 580    }
 581    }
 582    }
 583   
 584   
 585  942 private static void log(String msg)
 586    {
 587  942 System.out.println(Thread.currentThread().getName() + ": " + msg);
 588    }
 589   
 590   
 591  914 Transaction startTransaction() throws SystemException, NotSupportedException
 592    {
 593  914 DummyTransactionManager mgr = DummyTransactionManager.getInstance();
 594  914 mgr.begin();
 595  914 Transaction tx = mgr.getTransaction();
 596  914 return tx;
 597    }
 598   
 599   
 600  1 public static Test suite() throws Exception
 601    {
 602  1 return new TestSuite(DeadlockTest.class);
 603    }
 604   
 605  0 public static void main(String[] args) throws Exception
 606    {
 607  0 junit.textui.TestRunner.run(suite());
 608    }
 609   
 610   
 611    }