Current transation can not see modifications done by committ
veklov Dec 2, 2009 8:51 AMCurrent transation can not see modifications done by committed transactions while isolation level is READ_COMMITTED.
Cache version is 3.2.1.GA. Can be replicated when cache mode is REPL_SYNC or INVALIDATION_SYNC (the other are not tested). Please see below test cases for details.
Note: Test cases use DummyTransactionManager and behavior of second test case for REPL_SYNC mode depends on whether calls are done in context of JTA transaction or not, so in this mode the issue can be caused by a bug in DummyTransactionManager.
INVALIDATION_SYNC:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import javax.transaction.TransactionManager;
import junit.framework.TestCase;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheFactory;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.lock.IsolationLevel;
public class TestInvalidationSyncReadCommited extends TestCase {
 private static final String NODE_FQN = "/node";
 private static final String KEY = "key";
 private static final String VALUE1 = "value1";
 private static final String VALUE2 = "value2";
 private static Cache createCache() {
 final CacheFactory cf = new DefaultCacheFactory();
 final Configuration configuration = new Configuration();
 configuration.setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
 configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
 configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
 configuration.setLockAcquisitionTimeout(10000L);
 return cf.createCache(configuration, true);
 }
 public void testPut() throws Exception {
 final Cache cache1 = createCache();
 final Cache cache2 = createCache();
 assertEquals("Members count", 2, cache1.getMembers().size());
 assertEquals("Members count", 2, cache2.getMembers().size());
 final CyclicBarrier barrier = new CyclicBarrier(2);
 final List exceptions = Collections.synchronizedList(new ArrayList());
 cache1.put(NODE_FQN, KEY, VALUE1);
 final Thread thread1 = new Thread() {
 public void run() {
 try {
 getTransactionManager(cache1).begin();
 assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
 await(barrier);
 await(barrier);
// assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
 assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
 getTransactionManager(cache1).commit();
 assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
 } catch (Throwable e) {
 exceptions.add(e);
 }
 }
 };
 final Thread thread2 = new Thread() {
 public void run() {
 try {
 await(barrier);
 getTransactionManager(cache2).begin();
 cache2.put(NODE_FQN, KEY, VALUE2);
 getTransactionManager(cache2).commit();
 await(barrier);
 } catch (Throwable e) {
 exceptions.add(e);
 }
 }
 };
 thread1.start();
 thread2.start();
 thread1.join();
 thread2.join();
 cache1.stop();
 cache2.stop();
 if (!exceptions.isEmpty()) {
 fail(exceptions.toString());
 }
 }
 public void testRemoveNode() throws Exception {
 final Cache cache1 = createCache();
 final Cache cache2 = createCache();
 assertEquals("Members count", 2, cache1.getMembers().size());
 assertEquals("Members count", 2, cache2.getMembers().size());
 final CyclicBarrier barrier = new CyclicBarrier(2);
 final List exceptions = Collections.synchronizedList(new ArrayList());
 cache1.put(NODE_FQN, KEY, VALUE1);
 final Thread thread1 = new Thread() {
 public void run() {
 try {
 getTransactionManager(cache1).begin();
 assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
 await(barrier);
 await(barrier);
// assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
 assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
 getTransactionManager(cache1).commit();
 assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
 } catch (Throwable e) {
 exceptions.add(e);
 }
 }
 };
 final Thread thread2 = new Thread() {
 public void run() {
 try {
 await(barrier);
 getTransactionManager(cache2).begin();
 cache2.removeNode(NODE_FQN);
 getTransactionManager(cache2).commit();
 await(barrier);
 } catch (Throwable e) {
 exceptions.add(e);
 }
 }
 };
 thread1.start();
 thread2.start();
 thread1.join();
 thread2.join();
 cache1.stop();
 cache2.stop();
 if (!exceptions.isEmpty()) {
 fail(exceptions.toString());
 }
 }
 private static TransactionManager getTransactionManager(final Cache cache) {
 return ((CacheSPI) cache).getTransactionManager();
 }
 private static void await(final CyclicBarrier barrier) throws Exception {
 barrier.await(20, TimeUnit.SECONDS);
 }
}
REPL_SYNC (This case can be fixed by commenting out begin/commit calls in thread2, e.g. behavior depends on whether calls are done in context of JTA transaction or not. Is that an issue with DummyTransactionManager or with cache itself?):
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.transaction.TransactionManager;
import junit.framework.TestCase;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheFactory;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.lock.IsolationLevel;
public class TestReplSyncReadCommitted2 extends TestCase {
 private static final String NODE_FQN = "/node";
 private static final String KEY = "key";
 private static final String VALUE1 = "value1";
 private static final String VALUE2 = "value2";
 private static Cache createCache() {
 final CacheFactory cf = new DefaultCacheFactory();
 final Configuration configuration = new Configuration();
 configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
 configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
 configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
 configuration.setLockAcquisitionTimeout(10000L);
 return cf.createCache(configuration, true);
 }
 public void testRemoveNodeTwoCaches() throws InterruptedException {
 final Cache cache1 = createCache();
 final Cache cache2 = createCache();
 assertEquals("Members count", 2, cache1.getMembers().size());
 assertEquals("Members count", 2, cache2.getMembers().size());
 final CyclicBarrier barrier = new CyclicBarrier(2);
 final List exceptions = Collections.synchronizedList(new ArrayList());
 final Thread thread1 = new Thread() {
 public void run() {
 try {
 await(barrier);
 await(barrier);
 getTransactionManager(cache1).begin();
 assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
 await(barrier);
 await(barrier);
 assertNull("Must be removed", cache1.get(NODE_FQN, KEY));
 getTransactionManager(cache1).commit();
 } catch (Throwable e) {
 exceptions.add(e);
 }
 }
 };
 final Thread thread2 = new Thread() {
 public void run() {
 try {
 await(barrier);
 getTransactionManager(cache2).begin();
 cache2.put(NODE_FQN, KEY, VALUE1);
 getTransactionManager(cache2).commit();
 await(barrier);
 await(barrier);
 getTransactionManager(cache2).begin();
 cache2.removeNode(NODE_FQN);
 getTransactionManager(cache2).commit();
 await(barrier);
 } catch (Throwable e) {
 exceptions.add(e);
 }
 }
 };
 thread1.start();
 thread2.start();
 thread1.join();
 thread2.join();
 cache1.stop();
 cache2.stop();
 if (!exceptions.isEmpty()) {
 fail(exceptions.toString());
 }
 }
 public void testPutTwoCaches() throws InterruptedException {
 final Cache cache1 = createCache();
 final Cache cache2 = createCache();
 assertEquals("Members count", 2, cache1.getMembers().size());
 assertEquals("Members count", 2, cache2.getMembers().size());
 final CyclicBarrier barrier = new CyclicBarrier(2);
 final List exceptions = Collections.synchronizedList(new ArrayList());
 final Thread thread1 = new Thread() {
 public void run() {
 try {
 await(barrier);
 await(barrier);
 getTransactionManager(cache1).begin();
 assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
 await(barrier);
 await(barrier);
 assertEquals("Must be replicated", VALUE2, cache1.get(NODE_FQN, KEY));
 getTransactionManager(cache1).commit();
 } catch (Throwable e) {
 exceptions.add(e);
 }
 }
 };
 final Thread thread2 = new Thread() {
 public void run() {
 try {
 await(barrier);
 getTransactionManager(cache2).begin();
 cache2.put(NODE_FQN, KEY, VALUE1);
 getTransactionManager(cache2).commit();
 await(barrier);
 await(barrier);
 getTransactionManager(cache2).begin();
 cache2.put(NODE_FQN, KEY, VALUE2);
 getTransactionManager(cache2).commit();
 await(barrier);
 } catch (Throwable e) {
 exceptions.add(e);
 }
 }
 };
 thread1.start();
 thread2.start();
 thread1.join();
 thread2.join();
 cache1.stop();
 cache2.stop();
 if (!exceptions.isEmpty()) {
 fail(exceptions.toString());
 }
 }
 private static TransactionManager getTransactionManager(final Cache cache) {
 return ((CacheSPI) cache).getTransactionManager();
 }
 private static void await(final CyclicBarrier barrier)
 throws InterruptedException, BrokenBarrierException, TimeoutException {
 barrier.await(20, TimeUnit.SECONDS);
 }
}
 
    