Current transation can not see modifications done by committ
veklov Dec 2, 2009 8:51 AMCurrent transation can not see modifications done by committed transactions while isolation level is READ_COMMITTED.
Cache version is 3.2.1.GA. Can be replicated when cache mode is REPL_SYNC or INVALIDATION_SYNC (the other are not tested). Please see below test cases for details.
Note: Test cases use DummyTransactionManager and behavior of second test case for REPL_SYNC mode depends on whether calls are done in context of JTA transaction or not, so in this mode the issue can be caused by a bug in DummyTransactionManager.
INVALIDATION_SYNC:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import javax.transaction.TransactionManager;
import junit.framework.TestCase;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheFactory;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.lock.IsolationLevel;
public class TestInvalidationSyncReadCommited extends TestCase {
private static final String NODE_FQN = "/node";
private static final String KEY = "key";
private static final String VALUE1 = "value1";
private static final String VALUE2 = "value2";
private static Cache createCache() {
final CacheFactory cf = new DefaultCacheFactory();
final Configuration configuration = new Configuration();
configuration.setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
configuration.setLockAcquisitionTimeout(10000L);
return cf.createCache(configuration, true);
}
public void testPut() throws Exception {
final Cache cache1 = createCache();
final Cache cache2 = createCache();
assertEquals("Members count", 2, cache1.getMembers().size());
assertEquals("Members count", 2, cache2.getMembers().size());
final CyclicBarrier barrier = new CyclicBarrier(2);
final List exceptions = Collections.synchronizedList(new ArrayList());
cache1.put(NODE_FQN, KEY, VALUE1);
final Thread thread1 = new Thread() {
public void run() {
try {
getTransactionManager(cache1).begin();
assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
await(barrier);
await(barrier);
// assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
getTransactionManager(cache1).commit();
assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
} catch (Throwable e) {
exceptions.add(e);
}
}
};
final Thread thread2 = new Thread() {
public void run() {
try {
await(barrier);
getTransactionManager(cache2).begin();
cache2.put(NODE_FQN, KEY, VALUE2);
getTransactionManager(cache2).commit();
await(barrier);
} catch (Throwable e) {
exceptions.add(e);
}
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
cache1.stop();
cache2.stop();
if (!exceptions.isEmpty()) {
fail(exceptions.toString());
}
}
public void testRemoveNode() throws Exception {
final Cache cache1 = createCache();
final Cache cache2 = createCache();
assertEquals("Members count", 2, cache1.getMembers().size());
assertEquals("Members count", 2, cache2.getMembers().size());
final CyclicBarrier barrier = new CyclicBarrier(2);
final List exceptions = Collections.synchronizedList(new ArrayList());
cache1.put(NODE_FQN, KEY, VALUE1);
final Thread thread1 = new Thread() {
public void run() {
try {
getTransactionManager(cache1).begin();
assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
await(barrier);
await(barrier);
// assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
getTransactionManager(cache1).commit();
assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
} catch (Throwable e) {
exceptions.add(e);
}
}
};
final Thread thread2 = new Thread() {
public void run() {
try {
await(barrier);
getTransactionManager(cache2).begin();
cache2.removeNode(NODE_FQN);
getTransactionManager(cache2).commit();
await(barrier);
} catch (Throwable e) {
exceptions.add(e);
}
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
cache1.stop();
cache2.stop();
if (!exceptions.isEmpty()) {
fail(exceptions.toString());
}
}
private static TransactionManager getTransactionManager(final Cache cache) {
return ((CacheSPI) cache).getTransactionManager();
}
private static void await(final CyclicBarrier barrier) throws Exception {
barrier.await(20, TimeUnit.SECONDS);
}
}
REPL_SYNC (This case can be fixed by commenting out begin/commit calls in thread2, e.g. behavior depends on whether calls are done in context of JTA transaction or not. Is that an issue with DummyTransactionManager or with cache itself?):
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.transaction.TransactionManager;
import junit.framework.TestCase;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheFactory;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.lock.IsolationLevel;
public class TestReplSyncReadCommitted2 extends TestCase {
private static final String NODE_FQN = "/node";
private static final String KEY = "key";
private static final String VALUE1 = "value1";
private static final String VALUE2 = "value2";
private static Cache createCache() {
final CacheFactory cf = new DefaultCacheFactory();
final Configuration configuration = new Configuration();
configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
configuration.setLockAcquisitionTimeout(10000L);
return cf.createCache(configuration, true);
}
public void testRemoveNodeTwoCaches() throws InterruptedException {
final Cache cache1 = createCache();
final Cache cache2 = createCache();
assertEquals("Members count", 2, cache1.getMembers().size());
assertEquals("Members count", 2, cache2.getMembers().size());
final CyclicBarrier barrier = new CyclicBarrier(2);
final List exceptions = Collections.synchronizedList(new ArrayList());
final Thread thread1 = new Thread() {
public void run() {
try {
await(barrier);
await(barrier);
getTransactionManager(cache1).begin();
assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
await(barrier);
await(barrier);
assertNull("Must be removed", cache1.get(NODE_FQN, KEY));
getTransactionManager(cache1).commit();
} catch (Throwable e) {
exceptions.add(e);
}
}
};
final Thread thread2 = new Thread() {
public void run() {
try {
await(barrier);
getTransactionManager(cache2).begin();
cache2.put(NODE_FQN, KEY, VALUE1);
getTransactionManager(cache2).commit();
await(barrier);
await(barrier);
getTransactionManager(cache2).begin();
cache2.removeNode(NODE_FQN);
getTransactionManager(cache2).commit();
await(barrier);
} catch (Throwable e) {
exceptions.add(e);
}
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
cache1.stop();
cache2.stop();
if (!exceptions.isEmpty()) {
fail(exceptions.toString());
}
}
public void testPutTwoCaches() throws InterruptedException {
final Cache cache1 = createCache();
final Cache cache2 = createCache();
assertEquals("Members count", 2, cache1.getMembers().size());
assertEquals("Members count", 2, cache2.getMembers().size());
final CyclicBarrier barrier = new CyclicBarrier(2);
final List exceptions = Collections.synchronizedList(new ArrayList());
final Thread thread1 = new Thread() {
public void run() {
try {
await(barrier);
await(barrier);
getTransactionManager(cache1).begin();
assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
await(barrier);
await(barrier);
assertEquals("Must be replicated", VALUE2, cache1.get(NODE_FQN, KEY));
getTransactionManager(cache1).commit();
} catch (Throwable e) {
exceptions.add(e);
}
}
};
final Thread thread2 = new Thread() {
public void run() {
try {
await(barrier);
getTransactionManager(cache2).begin();
cache2.put(NODE_FQN, KEY, VALUE1);
getTransactionManager(cache2).commit();
await(barrier);
await(barrier);
getTransactionManager(cache2).begin();
cache2.put(NODE_FQN, KEY, VALUE2);
getTransactionManager(cache2).commit();
await(barrier);
} catch (Throwable e) {
exceptions.add(e);
}
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
cache1.stop();
cache2.stop();
if (!exceptions.isEmpty()) {
fail(exceptions.toString());
}
}
private static TransactionManager getTransactionManager(final Cache cache) {
return ((CacheSPI) cache).getTransactionManager();
}
private static void await(final CyclicBarrier barrier)
throws InterruptedException, BrokenBarrierException, TimeoutException {
barrier.await(20, TimeUnit.SECONDS);
}
}