Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 499   Methods: 27
NCLOC: 341   Classes: 5
 
 Source file Conditionals Statements Methods TOTAL
ReadWriteLockWithUpgrade.java 86.8% 84.4% 85.2% 85.1%
coverage coverage
 1    /*
 2    * JBoss, the OpenSource J2EE webOS
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7   
 8    package org.jboss.cache.lock;
 9   
 10    import org.apache.commons.logging.Log;
 11    import org.apache.commons.logging.LogFactory;
 12   
 13    import java.util.Map;
 14    import java.util.concurrent.TimeUnit;
 15    import java.util.concurrent.locks.Condition;
 16    import java.util.concurrent.locks.Lock;
 17    import java.util.concurrent.locks.ReadWriteLock;
 18   
 19    /*
 20    * <p> This class is similar to PreferredWriterReadWriteLock except that
 21    * the read lock is upgradable to write lock. I.e., when a user calls
 22    * upgradeLock(), it will release the read lock, wait for
 23    * all the read locks to clear and obtain a write lock afterwards. In
 24    * particular, the write lock is obtained with priority to prevent deadlock
 25    * situation. The current design is based in part from Doug Lea's
 26    * PreferredWriterReadWriteLock.
 27    *
 28    * <p>Note that the pre-requisite to use upgrade lock is the pre-existing of a read
 29    * lock. Otherwise, a RuntimeException will be thrown.</p>
 30    * <p>Also note that currently lock can only be obtained through <code>attempt</code>
 31    * api with specified timeout instead <code>acquire</code> is not supported.
 32    *
 33    * Internally, the upgrade is done through a Semaphore where a thread with
 34    * a higher priority will obtain the write lock first. The following scenarios then can
 35    * happen:
 36    * <ul>
 37    * <li>If there are multiple read locks granted (and no write lock request in waiting),
 38    * an upgrade will release one read lock
 39    * (decrease the counter), bump up upagrade counter, increase the current thread priority,
 40    * set a thread local as upgrade thread,
 41    * and place a write lock acquire() call. Upon waken up, it will check if the current
 42    * thread is an upgrade. If it is, restore the thread priority, and decrease the
 43    * upgrade counter.</li>
 44    * <li>If there are mutiple write locks request in waiting (and only one read lock granted),
 45    * decrease the read lock counter,
 46    * bump up the upgrade counter, and increase the current thread priority.
 47    * When one of the writer gets wake up, it will first check
 48    * if upgrade counter is zero. If not, it will first release the semaphore so the upgrade
 49    * thread can grab it, check the semaphore is gone, do notify, and issue myself another
 50    * acquire to grab the next available semaphore.</li>
 51    * </ul>
 52    *
 53    * @author Ben Wang
 54    */
 55    public class ReadWriteLockWithUpgrade implements ReadWriteLock
 56    {
 57    private long activeReaders_ = 0;
 58    protected Thread activeWriter_ = null;
 59    private long waitingReaders_ = 0;
 60    private long waitingWriters_ = 0;
 61    private long waitingUpgrader_ = 0;
 62    // Store a default object to signal that we are upgrade thread.
 63    //protected final ThreadLocal upgraderLocal_ = new ThreadLocal();
 64    protected static final Map<ReadWriteLock, Object> upgraderLocal_ = new ThreadLocalMap();
 65    protected static final Object dummy_ = new Object();
 66    protected final ReaderLock readerLock_ = new ReaderLock();
 67    protected final WriterLock writerLock_ = new WriterLock();
 68    protected static final Log log_ = LogFactory.getLog(ReadWriteLockWithUpgrade.class);
 69   
 70  100 public String toString()
 71    {
 72  100 StringBuffer sb = new StringBuffer();
 73  100 sb.append("activeReaders=").append(activeReaders_).append(", activeWriter=").append(activeWriter_);
 74  100 sb.append(", waitingReaders=").append(waitingReaders_).append(", waitingWriters=").append(waitingWriters_);
 75  100 sb.append(", waitingUpgrader=").append(waitingUpgrader_);
 76  100 return sb.toString();
 77    }
 78   
 79  778130 public Lock writeLock()
 80    {
 81  778130 return writerLock_;
 82    }
 83   
 84  11949891 public Lock readLock()
 85    {
 86  11949891 return readerLock_;
 87    }
 88   
 89    /**
 90    * Attempt to obtain an upgrade to writer lock. If successful, the read lock is upgraded to
 91    * write lock. If fails, the owner retains the read lock.
 92    *
 93    * @param msecs Time to wait in millisecons.
 94    * @return Sync object. Null if not successful or timeout.
 95    */
 96  12000 public Lock upgradeLockAttempt(long msecs) throws UpgradeException
 97    {
 98  12000 if (activeReaders_ == 0)
 99  0 throw new RuntimeException("No reader lock available for upgrade");
 100   
 101  12000 synchronized (writerLock_)
 102    {
 103  12000 if (waitingUpgrader_ >= 1)
 104    {
 105  1 String errStr = "upgradeLockAttempt(): more than one reader trying to simultaneously upgrade to write lock";
 106  1 log_.error(errStr);
 107  1 throw new UpgradeException(errStr);
 108    }
 109  11999 waitingUpgrader_++;
 110  11999 upgraderLocal_.put(this, dummy_);
 111    }
 112   
 113    // If there is only one reader left, switch to write lock immediately.
 114    // If there is more than one reader, release this one and acquire the write
 115    // lock. There is still a chance for deadlock when there are two reader locks
 116    // and suddenly the second lock is released when this lock is released and acquired
 117    // as is else case. Solution is to let it timeout.
 118  11999 if (activeReaders_ == 1)
 119    {
 120  11992 resetWaitingUpgrader();
 121  11992 return changeLock();
 122    }
 123    else
 124    {
 125  7 readerLock_.unlock();
 126  7 try
 127    {
 128  7 if (!writerLock_.tryLock(msecs, TimeUnit.MILLISECONDS))
 129    {
 130  3 log_.error("upgradeLock(): failed");
 131  3 resetWaitingUpgrader();
 132   
 133  3 if (!readerLock_.tryLock(msecs, TimeUnit.MILLISECONDS))
 134    {
 135  0 String errStr = "ReadWriteLockWithUpgrade.upgradeLockAttempt():" +
 136    " failed to upgrade to write lock and also failed to re-obtain the read lock";
 137  0 log_.error(errStr);
 138  0 throw new IllegalStateException(errStr);
 139    }
 140  3 return null;
 141    }
 142  4 resetWaitingUpgrader();
 143    }
 144    catch (InterruptedException ex)
 145    {
 146  0 resetWaitingUpgrader();
 147  0 return null;
 148    }
 149   
 150  4 return writerLock_;
 151    }
 152    }
 153   
 154  11999 private void resetWaitingUpgrader()
 155    {
 156  11999 synchronized (writerLock_)
 157    {
 158  11999 waitingUpgrader_--;
 159  11999 upgraderLocal_.remove(this);
 160    }
 161    }
 162   
 163  11992 protected synchronized Lock changeLock()
 164    {
 165  11992 --activeReaders_;
 166   
 167  11992 if (!startWrite())
 168    {
 169    // Something is wrong.
 170  0 return null;
 171    }
 172   
 173  11992 return writerLock_;
 174    }
 175   
 176    /*
 177    A bunch of small synchronized methods are needed
 178    to allow communication from the Lock objects
 179    back to this object, that serves as controller
 180    */
 181  22 protected synchronized void cancelledWaitingReader()
 182    {
 183  22 --waitingReaders_;
 184    }
 185   
 186  22 protected synchronized void cancelledWaitingWriter()
 187    {
 188  22 --waitingWriters_;
 189    }
 190   
 191    /**
 192    * Override this method to change to reader preference *
 193    */
 194  5982243 protected boolean allowReader()
 195    {
 196  5982239 return activeWriter_ == null && waitingWriters_ == 0 && waitingUpgrader_ == 0;
 197    }
 198   
 199  5981659 protected synchronized boolean startRead()
 200    {
 201  5981659 boolean allowRead = allowReader();
 202  5981659 if (allowRead)
 203    {
 204  5981021 ++activeReaders_;
 205    }
 206  5981659 return allowRead;
 207    }
 208   
 209  399999 protected synchronized boolean startWrite()
 210    {
 211    // The allowWrite expression cannot be modified without
 212    // also changing startWrite, so is hard-wired
 213  399999 boolean allowWrite = activeWriter_ == null && activeReaders_ == 0;
 214  394316 if (allowWrite) activeWriter_ = Thread.currentThread();
 215  399999 return allowWrite;
 216    }
 217   
 218    /*
 219    Each of these variants is needed to maintain atomicity of wait counts during wait loops. They could be
 220    made faster by manually inlining each other. We hope that compilers do this for us though.
 221    */
 222  5673357 protected synchronized boolean startReadFromNewReader()
 223    {
 224  5673357 boolean pass = startRead();
 225  151 if (!pass) ++waitingReaders_;
 226  5673357 return pass;
 227    }
 228   
 229  305861 protected synchronized boolean startWriteFromNewWriter()
 230    {
 231  305861 boolean pass = startWrite();
 232  5273 if (!pass) ++waitingWriters_;
 233  305861 return pass;
 234    }
 235   
 236  605 protected synchronized boolean startReadFromWaitingReader()
 237    {
 238  605 boolean pass = startRead();
 239  129 if (pass) --waitingReaders_;
 240  605 return pass;
 241    }
 242   
 243  5655 protected synchronized boolean startWriteFromWaitingWriter()
 244    {
 245  5655 boolean pass = startWrite();
 246  5251 if (pass) --waitingWriters_;
 247  5655 return pass;
 248    }
 249   
 250    /**
 251    * Called upon termination of a read.
 252    * Returns the object to signal to wake up a waiter, or null if no such
 253    */
 254  5968879 protected synchronized Signaller endRead()
 255    {
 256  5968879 if (activeReaders_ != 0 && --activeReaders_ == 0 && waitingWriters_ > 0)
 257  46 return writerLock_;
 258    else
 259  5968833 return null;
 260    }
 261   
 262   
 263    /**
 264    * Called upon termination of a write.
 265    * Returns the object to signal to wake up a waiter, or null if no such
 266    */
 267  394984 protected synchronized Signaller endWrite()
 268    {
 269  394984 activeWriter_ = null;
 270  394984 if (waitingReaders_ > 0 && allowReader())
 271  584 return readerLock_;
 272  394400 else if (waitingWriters_ > 0)
 273  5690 return writerLock_;
 274    else
 275  388710 return null;
 276    }
 277   
 278   
 279    /**
 280    * Reader and Writer requests are maintained in two different
 281    * wait sets, by two different objects. These objects do not
 282    * know whether the wait sets need notification since they
 283    * don't know preference rules. So, each supports a
 284    * method that can be selected by main controlling object
 285    * to perform the notifications. This base class simplifies mechanics.
 286    */
 287   
 288    static interface Signaller
 289    { // base for ReaderLock and WriterLock
 290   
 291    void signalWaiters();
 292    }
 293   
 294    static abstract class LockBase implements Lock
 295    {
 296   
 297  0 public void lock()
 298    {
 299  0 throw new UnsupportedOperationException();
 300    }
 301   
 302  0 public void lockInterruptibly() throws InterruptedException
 303    {
 304  0 throw new UnsupportedOperationException();
 305    }
 306   
 307  0 public Condition newCondition()
 308    {
 309  0 throw new UnsupportedOperationException();
 310    }
 311   
 312  0 public boolean tryLock()
 313    {
 314  0 throw new UnsupportedOperationException();
 315    }
 316   
 317    /*
 318    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
 319    {
 320    throw new UnsupportedOperationException();
 321    }
 322   
 323    public void unlock()
 324    {
 325    throw new UnsupportedOperationException();
 326    }
 327    */
 328   
 329    }
 330   
 331    protected class ReaderLock extends LockBase implements Signaller, Lock
 332    {
 333   
 334  5968879 public void unlock()
 335    {
 336  5968879 Signaller s = endRead();
 337  5968879 if (s != null)
 338    {
 339  46 s.signalWaiters();
 340    }
 341    }
 342   
 343  606 public synchronized void signalWaiters()
 344    {
 345  606 ReaderLock.this.notifyAll();
 346    }
 347   
 348  5981054 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
 349    {
 350  0 if (Thread.interrupted()) throw new InterruptedException();
 351  5981033 long msecs = unit.toMillis(time);
 352  5981054 InterruptedException ie = null;
 353  5981054 synchronized (this)
 354    {
 355  5981054 if (msecs <= 0)
 356  307697 return startRead();
 357  5673357 else if (startReadFromNewReader())
 358  5673206 return true;
 359    else
 360    {
 361  151 long waitTime = msecs;
 362  151 long start = System.currentTimeMillis();
 363  151 while (true)
 364    {
 365  605 try
 366    {
 367  605 ReaderLock.this.wait(waitTime);
 368    }
 369    catch (InterruptedException ex)
 370    {
 371  0 cancelledWaitingReader();
 372  0 ie = ex;
 373  0 break;
 374    }
 375  605 if (startReadFromWaitingReader())
 376  129 return true;
 377    else
 378    {
 379  476 waitTime = msecs - (System.currentTimeMillis() - start);
 380  476 if (waitTime <= 0)
 381    {
 382  22 cancelledWaitingReader();
 383  22 break;
 384    }
 385    }
 386    }
 387    }
 388    }
 389    // safeguard on interrupt or timeout:
 390  22 writerLock_.signalWaiters();
 391  22 if (ie != null)
 392  0 throw ie;
 393    else
 394  22 return false; // timed out
 395    }
 396   
 397    }
 398   
 399    protected class WriterLock extends LockBase implements Signaller, Lock
 400    {
 401   
 402  394984 public void unlock()
 403    {
 404  394984 Signaller s = endWrite();
 405  6274 if (s != null) s.signalWaiters();
 406    }
 407   
 408    // Waking up all thread in waiting now for them to compete.
 409    // Thread with higher priority, i.e., upgrading, will win.
 410  5758 public synchronized void signalWaiters()
 411    {
 412  5758 WriterLock.this.notifyAll();
 413    }
 414   
 415  383172 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
 416    {
 417  0 if (Thread.interrupted()) throw new InterruptedException();
 418  383172 InterruptedException ie = null;
 419  383172 long msecs = unit.toMillis(time);
 420   
 421  383172 synchronized (WriterLock.this)
 422    {
 423  383172 if (msecs <= 0)
 424    {
 425    // Upgrade thread has prioirty.
 426  77311 if (waitingUpgrader_ != 0)
 427    {
 428  4 if (upgraderLocal_.get(ReadWriteLockWithUpgrade.this) != null)
 429    {
 430  4 log_.info("attempt(): upgrade to write lock");
 431  4 return startWrite();
 432    }
 433    else
 434  0 return false;
 435    }
 436    else
 437  77307 return startWrite();
 438    }
 439  305861 else if (startWriteFromNewWriter())
 440  300588 return true;
 441    else
 442    {
 443  5273 long waitTime = msecs;
 444  5273 long start = System.currentTimeMillis();
 445  5273 while (true)
 446    {
 447  5655 try
 448    {
 449  5655 WriterLock.this.wait(waitTime);
 450    }
 451    catch (InterruptedException ex)
 452    {
 453  0 cancelledWaitingWriter();
 454  0 WriterLock.this.notifyAll();
 455  0 ie = ex;
 456  0 break;
 457    }
 458   
 459  5655 if (waitingUpgrader_ != 0)
 460    { // Has upgrade request
 461  3 if (upgraderLocal_.get(ReadWriteLockWithUpgrade.this) != null)
 462    { // Upgrade thread
 463  3 if (startWriteFromWaitingWriter())
 464  2 return true;
 465    }
 466    else
 467    { // Normal write thread, go back to wait.
 468  0 continue;
 469    }
 470    }
 471    else
 472    { // Every one is normal write thread. Compete; if fail go back to wait.
 473  5652 if (startWriteFromWaitingWriter())
 474  5249 return true;
 475    }
 476   
 477  404 waitTime = msecs - (System.currentTimeMillis() - start);
 478  404 if (waitTime <= 0)
 479    {
 480  22 cancelledWaitingWriter();
 481  22 WriterLock.this.notifyAll();
 482  22 break;
 483    }
 484    }
 485    }
 486    }
 487   
 488  22 readerLock_.signalWaiters();
 489  22 if (ie != null)
 490  0 throw ie;
 491    else
 492  22 return false; // timed out
 493    }
 494   
 495    }
 496   
 497   
 498    }
 499