1 Reply Latest reply on Nov 26, 2009 5:09 AM by veklov

    READ_COMMITTED is violated when cache mode is INVALIDATION_S

      Hello.

      Please see below test case. Currently working assertions are commented out. Is that a bug? Cache version is 3.2.1.GA

      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.List;
      import java.util.concurrent.CyclicBarrier;
      import java.util.concurrent.TimeUnit;
      import javax.transaction.TransactionManager;
      
      import junit.framework.TestCase;
      import org.jboss.cache.Cache;
      import org.jboss.cache.CacheFactory;
      import org.jboss.cache.CacheSPI;
      import org.jboss.cache.DefaultCacheFactory;
      import org.jboss.cache.config.Configuration;
      import org.jboss.cache.lock.IsolationLevel;
      
      public class TestInvalidationSyncReadCommited extends TestCase {
      
       private static final String NODE_FQN = "/node";
       private static final String KEY = "key";
       private static final String VALUE1 = "value1";
       private static final String VALUE2 = "value2";
      
       private static Cache createCache() {
       final CacheFactory cf = new DefaultCacheFactory();
       final Configuration configuration = new Configuration();
       configuration.setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
       configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
       configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
       configuration.setLockAcquisitionTimeout(10000L);
       return cf.createCache(configuration, true);
       }
      
       public void testPut() throws Exception {
       final Cache cache1 = createCache();
       final Cache cache2 = createCache();
       assertEquals("Members count", 2, cache1.getMembers().size());
       assertEquals("Members count", 2, cache2.getMembers().size());
      
       final CyclicBarrier barrier = new CyclicBarrier(2);
       final List exceptions = Collections.synchronizedList(new ArrayList());
      
       cache1.put(NODE_FQN, KEY, VALUE1);
      
       final Thread thread1 = new Thread() {
       public void run() {
       try {
       getTransactionManager(cache1).begin();
       assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
       await(barrier);
       await(barrier);
      // assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
       assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
       getTransactionManager(cache1).commit();
       assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
       final Thread thread2 = new Thread() {
       public void run() {
       try {
       await(barrier);
       getTransactionManager(cache2).begin();
       cache2.put(NODE_FQN, KEY, VALUE2);
       getTransactionManager(cache2).commit();
       await(barrier);
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
      
       thread1.start();
       thread2.start();
       thread1.join();
       thread2.join();
      
       cache1.stop();
       cache2.stop();
      
       if (!exceptions.isEmpty()) {
       fail(exceptions.toString());
       }
       }
      
       public void testRemoveNode() throws Exception {
       final Cache cache1 = createCache();
       final Cache cache2 = createCache();
       assertEquals("Members count", 2, cache1.getMembers().size());
       assertEquals("Members count", 2, cache2.getMembers().size());
      
       final CyclicBarrier barrier = new CyclicBarrier(2);
       final List exceptions = Collections.synchronizedList(new ArrayList());
      
       cache1.put(NODE_FQN, KEY, VALUE1);
      
       final Thread thread1 = new Thread() {
       public void run() {
       try {
       getTransactionManager(cache1).begin();
       assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
       await(barrier);
       await(barrier);
      // assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
       assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
       getTransactionManager(cache1).commit();
       assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
       final Thread thread2 = new Thread() {
       public void run() {
       try {
       await(barrier);
       getTransactionManager(cache2).begin();
       cache2.removeNode(NODE_FQN);
       getTransactionManager(cache2).commit();
       await(barrier);
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
      
       thread1.start();
       thread2.start();
       thread1.join();
       thread2.join();
      
       cache1.stop();
       cache2.stop();
      
       if (!exceptions.isEmpty()) {
       fail(exceptions.toString());
       }
       }
      
       private static TransactionManager getTransactionManager(final Cache cache) {
       return ((CacheSPI) cache).getTransactionManager();
       }
      
       private static void await(final CyclicBarrier barrier) throws Exception {
       barrier.await(20, TimeUnit.SECONDS);
       }
      }
      


        • 1. Re: READ_COMMITTED is violated when cache mode is INVALIDATI

          There is similar issue with REPL_SYNC. In this case it can be fixed by commenting out begin/end calls in thread2. Is that an issue with DummyTransactionManagerLookup or with cache itself?

          import java.util.ArrayList;
          import java.util.Collections;
          import java.util.List;
          import java.util.concurrent.BrokenBarrierException;
          import java.util.concurrent.CyclicBarrier;
          import java.util.concurrent.TimeUnit;
          import java.util.concurrent.TimeoutException;
          import javax.transaction.TransactionManager;
          
          import junit.framework.TestCase;
          import org.jboss.cache.Cache;
          import org.jboss.cache.CacheFactory;
          import org.jboss.cache.CacheSPI;
          import org.jboss.cache.DefaultCacheFactory;
          import org.jboss.cache.config.Configuration;
          import org.jboss.cache.lock.IsolationLevel;
          
          public class TestReplSyncReadCommitted2 extends TestCase {
          
           private static final String NODE_FQN = "/node";
           private static final String KEY = "key";
           private static final String VALUE1 = "value1";
           private static final String VALUE2 = "value2";
          
           private static Cache createCache() {
           final CacheFactory cf = new DefaultCacheFactory();
           final Configuration configuration = new Configuration();
           configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
           configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
           configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
           configuration.setLockAcquisitionTimeout(10000L);
           return cf.createCache(configuration, true);
           }
          
           public void testRemoveNodeTwoCaches() throws InterruptedException {
           final Cache cache1 = createCache();
           final Cache cache2 = createCache();
           assertEquals("Members count", 2, cache1.getMembers().size());
           assertEquals("Members count", 2, cache2.getMembers().size());
          
           final CyclicBarrier barrier = new CyclicBarrier(2);
           final List exceptions = Collections.synchronizedList(new ArrayList());
          
           final Thread thread1 = new Thread() {
           public void run() {
           try {
           await(barrier);
           await(barrier);
           getTransactionManager(cache1).begin();
           assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
           await(barrier);
           await(barrier);
           assertNull("Must be removed", cache1.get(NODE_FQN, KEY));
           getTransactionManager(cache1).commit();
           } catch (Throwable e) {
           exceptions.add(e);
           }
           }
           };
           final Thread thread2 = new Thread() {
           public void run() {
           try {
           await(barrier);
           getTransactionManager(cache2).begin();
           cache2.put(NODE_FQN, KEY, VALUE1);
           getTransactionManager(cache2).commit();
           await(barrier);
           await(barrier);
           getTransactionManager(cache2).begin();
           cache2.removeNode(NODE_FQN);
           getTransactionManager(cache2).commit();
           await(barrier);
           } catch (Throwable e) {
           exceptions.add(e);
           }
           }
           };
          
           thread1.start();
           thread2.start();
           thread1.join();
           thread2.join();
          
           cache1.stop();
           cache2.stop();
          
           if (!exceptions.isEmpty()) {
           fail(exceptions.toString());
           }
           }
          
           public void testPutTwoCaches() throws InterruptedException {
           final Cache cache1 = createCache();
           final Cache cache2 = createCache();
           assertEquals("Members count", 2, cache1.getMembers().size());
           assertEquals("Members count", 2, cache2.getMembers().size());
          
           final CyclicBarrier barrier = new CyclicBarrier(2);
           final List exceptions = Collections.synchronizedList(new ArrayList());
          
           final Thread thread1 = new Thread() {
           public void run() {
           try {
           await(barrier);
           await(barrier);
           getTransactionManager(cache1).begin();
           assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
           await(barrier);
           await(barrier);
           assertEquals("Must be replicated", VALUE2, cache1.get(NODE_FQN, KEY));
           getTransactionManager(cache1).commit();
           } catch (Throwable e) {
           exceptions.add(e);
           }
           }
           };
           final Thread thread2 = new Thread() {
           public void run() {
           try {
           await(barrier);
           getTransactionManager(cache2).begin();
           cache2.put(NODE_FQN, KEY, VALUE1);
           getTransactionManager(cache2).commit();
           await(barrier);
           await(barrier);
           getTransactionManager(cache2).begin();
           cache2.put(NODE_FQN, KEY, VALUE2);
           getTransactionManager(cache2).commit();
           await(barrier);
           } catch (Throwable e) {
           exceptions.add(e);
           }
           }
           };
          
           thread1.start();
           thread2.start();
           thread1.join();
           thread2.join();
          
           cache1.stop();
           cache2.stop();
          
           if (!exceptions.isEmpty()) {
           fail(exceptions.toString());
           }
           }
          
           private static TransactionManager getTransactionManager(final Cache cache) {
           return ((CacheSPI) cache).getTransactionManager();
           }
          
           private static void await(final CyclicBarrier barrier)
           throws InterruptedException, BrokenBarrierException, TimeoutException {
           barrier.await(20, TimeUnit.SECONDS);
           }
          }