2 Replies Latest reply on Aug 18, 2006 4:01 PM by cw_krebs

    IllegalStateException on commit

    cw_krebs

      Hi,

      I played little bit around with the tree cache using the optimistic locking scheme. I wrote a small unit tests (s.b.), which tries to modify the same node at the same time by different threads in different transactions. At the first view, it looks like, that everything works fine. But running the test multiple times showed, that sometimes an IllegalStateException instead of CacheException on commiting the transaction will be thrown:

      java.lang.RuntimeException:
       at org.jboss.cache.interceptors.TxInterceptor$LocalSynchronizationHandler.beforeCompletion(TxInterceptor.java:1091)
       at org.jboss.cache.interceptors.OrderedSynchronizationHandler.beforeCompletion(OrderedSynchronizationHandler.java:75)
       at org.objectweb.jotm.SubCoordinator.doBeforeCompletion(SubCoordinator.java:1487)
       at org.objectweb.jotm.SubCoordinator.commit_one_phase(SubCoordinator.java:416)
       at org.objectweb.jotm.TransactionImpl.commit(TransactionImpl.java:239)
       at de.gameduell.server.cache.PureOptimisticLockingTest$Worker.run(PureOptimisticLockingTest.java:137)
      Caused by: java.lang.IllegalStateException: there is already a writer holding the lock: GlobalTransaction:<null>:41
       at org.jboss.cache.lock.LockMap.setWriterIfNotNull(LockMap.java:96)
       at org.jboss.cache.lock.IdentityLock.acquireWriteLock(IdentityLock.java:204)
       at org.jboss.cache.Node.acquireWriteLock(Node.java:431)
       at org.jboss.cache.Node.acquire(Node.java:386)
       at org.jboss.cache.interceptors.OptimisticLockingInterceptor.lockNodes(OptimisticLockingInterceptor.java:149)
       at org.jboss.cache.interceptors.OptimisticLockingInterceptor.invoke(OptimisticLockingInterceptor.java:76)
       at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:68)
       at org.jboss.cache.interceptors.TxInterceptor.runPreparePhase(TxInterceptor.java:804)
       at org.jboss.cache.interceptors.TxInterceptor$LocalSynchronizationHandler.beforeCompletion(TxInterceptor.java:1069)
       ... 5 more


      Running the TreeCache I used JBossCache 1.4.0.GA and JOTM 2.0.10 with the following cache-config:

      <?xml version="1.0" encoding="UTF-8"?>
      <server>
       <mbean code="org.jboss.cache.TreeCache" name="jboss.cache:service=TreeCache">
       <attribute name="NodeLockingScheme">OPTIMISTIC</attribute>
       <attribute name="CacheMode">LOCAL</attribute>
       </mbean>
      </server>
      
      

      That's my JUnit-Test:
      import javax.transaction.Transaction;
      import javax.transaction.TransactionManager;
      
      import junit.framework.TestCase;
      
      import org.jboss.cache.CacheException;
      import org.jboss.cache.PropertyConfigurator;
      import org.jboss.cache.TransactionManagerLookup;
      import org.jboss.cache.TreeCache;
      import org.objectweb.jotm.Current;
      
      import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
      
      
      /**
       * OptimisticLockingTest
       */
      public class OptimisticLockingTest extends TestCase {
      
       private static final String NODE_PATH = "/path/node";
       private static final String KEY = "key";
      
       private static final Current jotm = new Current();
      
       private TreeCache cache;
      
       protected void setUp() throws Exception {
       super.setUp();
      
       cache = new TreeCache();
       final PropertyConfigurator config = new PropertyConfigurator();
       cache.setTransactionManagerLookup(new TransactionManagerLookup() {
       public TransactionManager getTransactionManager() throws Exception {
       return Current.getTransactionManager();
       }
       });
       config.configure(cache, this.getClass().getResourceAsStream(
       "/cache-config.xml"));
       cache.startService();
       }
      
       public void testSimultanWrite() throws Exception {
       for (int i = 2; i < 20; i++) {
       System.out.println(">> testing simultaneous writes with " + i
       + " threads...");
       testSimultanWrite(i);
       }
       }
      
       public void testSimultanWrite(final int _numThreads)
       throws Exception {
       final CyclicBarrier barrier = new CyclicBarrier(_numThreads);
      
       Worker[] workers = new Worker[_numThreads];
       for (int i = 0; i < workers.length; i++) {
       workers = new Worker();
       workers.barrier = barrier;
       workers.value = "worker" + i;
       if (i == 0) {
       workers.isMaster = true;
       }
       workers.start();
       }
      
       for (int i = 0; i < workers.length; i++) {
       workers.join();
       }
      
       assertNull(workers[0].exception);
      
       for (int i = 0; i < workers.length; i++) {
       assertNull("rollback failed", workers.rollbackException);
       }
      
       for (int i = 1; i < workers.length; i++) {
       assertNotNull("missing exception", workers.exception);
       assertTrue("exception is not of type CacheException",
       workers.exception.getCause() instanceof CacheException);
       }
       assertEquals("worker0", cache.get(NODE_PATH, KEY));
      
       cache.getTransactionManager().begin();
       cache.remove(NODE_PATH);
       cache.getTransactionManager().commit();
       }
      
       private class Worker extends Thread {
      
       private CyclicBarrier barrier;
       private Exception exception;
       private Exception rollbackException;
       private String value;
       private boolean isMaster;
      
       /**
       * @see java.lang.Thread#run()
       */
       public void run() {
      
       final TransactionManager txManager = cache.getTransactionManager();
      
       Transaction tx = null;
       try {
       // wait for all threads started
       barrier.barrier();
      
       if (isMaster) {
       txManager.begin();
       cache.put(NODE_PATH, KEY, value);
      
       // lets the other threads start writing
       barrier.barrier();
      
       // wait for other threads to their modification
       barrier.barrier();
      
       // commit first
       tx = txManager.getTransaction();
       tx.commit();
      
       // lets the other threads commit
       barrier.barrier();
       } else {
       // wait for master to start transaction
       barrier.barrier();
      
       txManager.begin();
       cache.put(NODE_PATH, KEY, value);
      
       // signal modification
       barrier.barrier();
      
       // wait for master to commit transaction
       barrier.barrier();
       tx = txManager.getTransaction();
       tx.commit();
       }
       } catch (Exception e) {
       exception = e;
       try {
       tx.rollback();
       } catch (Exception e1) {
       rollbackException = e1;
       }
       }
       }
       }
       }