Repeatable Reader - are readlocks actually obtained?
skaffman Jun 17, 2006 11:39 AMI've written a unit test which seems to demonstrate that the Repeatable Read semantics are not being followed for PojoCache under 1.4.0.BETA2. Even though the test uses DummyTransactionManager, I understand that the locking semantics should still be correct.
In summary, I create two threads, each with their own PojoCache instance, but in the same cluster. The 1st thread reads a value from the cache within a transaction, the 2nd thread then writes a new value under that FQN, and the 1st thread then re-reads the value (within the original transaction), and gets a non-repeatable read, i.e. the value as written by thread 2.
When I read the source code for PojoCache, I see that explicit locks are acquired for a putObject() or removeObject(), but not for getObject().
Is this a flaw, or am I mis-understanding the applicability of isolation level within this context? It doesn't seem to follow ACID semantics.
Anyway, here's the test case, plus the cache config file:
package test;
import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.PropertyConfigurator;
import org.jboss.cache.aop.PojoCache;
public class RepeatableReadTest extends TestCase {
private final CountDownLatch startSignal = new CountDownLatch(1);
private final CountDownLatch doneSignal = new CountDownLatch(2);
private final CountDownLatch signalA = new CountDownLatch(1);
private final CountDownLatch signalB = new CountDownLatch(1);
private final ExecutorService executor = Executors.newCachedThreadPool();
@Override
protected void tearDown() throws Exception {
executor.shutdownNow();
}
public void test() throws Exception {
Log log = LogFactory.getLog(getClass());
log.info("Creating and starting worker threads");
executor.submit(new Worker1());
executor.submit(new Worker2());
log.info("Seeding cache with initial value for /test");
PojoCache cache = createCache();
cache.putObject("/test", "value1");
startSignal.countDown();
doneSignal.await();
}
class Worker1 implements Callable {
private final PojoCache cache = createCache();
private Log log = LogFactory.getLog(getClass());
public Object call() throws Exception {
startSignal.await();
log.info("Beginning new tx and reading value");
cache.getTransactionManager().begin();
log.info("Cache contains value for /test : " + cache.getObject("/test"));
log.info("Sending signal to thread 2");
signalA.countDown();
log.info("Waiting for signal from thread 2");
signalB.await();
log.info("Cache contains value for /test : " + cache.getObject("/test"));
log.info("Done");
doneSignal.countDown();
return null;
}
}
class Worker2 implements Callable {
private final PojoCache cache = createCache();
private Log log = LogFactory.getLog(getClass());
public Object call() throws Exception {
startSignal.await();
log.info("Waiting for signal from thread 1");
signalA.await();
log.info("Starting new tx and writing new value for /test");
cache.getTransactionManager().begin();
cache.putObject("/test", "value2");
log.info("Committing");
cache.getTransactionManager().commit();
log.info("Sending signal to thread 1");
signalB.countDown();
log.info("Done");
doneSignal.countDown();
return null;
}
}
private PojoCache createCache() throws RuntimeException {
try {
PojoCache cache = new PojoCache();
InputStream configStream = getClass().getResourceAsStream("cache-service.xml");
new PropertyConfigurator().configure(cache, configStream);
cache.setClusterName(getName());
cache.start();
return cache;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
And now the config:
<?xml version="1.0" encoding="UTF-8"?> <!-- ===================================================================== --> <!-- --> <!-- Sample TreeCache Service Configuration --> <!-- --> <!-- ===================================================================== --> <server> <!-- ==================================================================== --> <!-- Defines TreeCache configuration --> <!-- ==================================================================== --> <mbean code="org.jboss.cache.aop.PojoCache" name="test:service=PojoCache"> <depends>jboss:service=Naming</depends> <depends>jboss:service=TransactionManager</depends> <!-- Configure the TransactionManager --> <attribute name="TransactionManagerLookupClass">org.jboss.cache.GenericTransactionManagerLookup</attribute> <!-- Isolation level : SERIALIZABLE REPEATABLE_READ (default) READ_COMMITTED READ_UNCOMMITTED NONE --> <attribute name="IsolationLevel">REPEATABLE_READ</attribute> <!-- Valid modes are LOCAL REPL_ASYNC REPL_SYNC INVALIDATION_ASYNC INVALIDATION_SYNC --> <attribute name="CacheMode">REPL_SYNC</attribute> <!-- Just used for async repl: use a replication queue --> <attribute name="UseReplQueue">false</attribute> <!-- Replication interval for replication queue (in ms) --> <attribute name="ReplQueueInterval">0</attribute> <!-- Max number of elements which trigger replication --> <attribute name="ReplQueueMaxElements">0</attribute> <!-- Name of cluster. Needs to be the same for all clusters, in order to find each other --> <attribute name="ClusterName">TestCache</attribute> <!-- JGroups protocol stack properties. Can also be a URL, e.g. file:/home/bela/default.xml <attribute name="ClusterProperties"></attribute> --> <attribute name="ClusterConfig"> <config> <!-- UDP: if you have a multihomed machine, set the bind_addr attribute to the appropriate NIC IP address, e.g bind_addr="192.168.0.2" --> <!-- UDP: On Windows machines, because of the media sense feature being broken with multicast (even after disabling media sense) set the loopback attribute to true --> <UDP mcast_addr="228.1.2.3" mcast_port="48866" ip_ttl="64" ip_mcast="true" mcast_send_buf_size="150000" mcast_recv_buf_size="80000" ucast_send_buf_size="150000" ucast_recv_buf_size="80000" loopback="false"/> <PING timeout="2000" num_initial_members="3" up_thread="false" down_thread="false"/> <MERGE2 min_interval="10000" max_interval="20000"/> <!-- <FD shun="true" up_thread="true" down_thread="true" />--> <FD_SOCK/> <VERIFY_SUSPECT timeout="1500" up_thread="false" down_thread="false"/> <pbcast.NAKACK gc_lag="50" retransmit_timeout="600,1200,2400,4800" max_xmit_size="8192" up_thread="false" down_thread="false"/> <UNICAST timeout="600,1200,2400" window_size="100" min_threshold="10" down_thread="false"/> <pbcast.STABLE desired_avg_gossip="20000" up_thread="false" down_thread="false"/> <FRAG frag_size="8192" down_thread="false" up_thread="false"/> <pbcast.GMS join_timeout="5000" join_retry_timeout="2000" shun="true" print_local_addr="true"/> <pbcast.STATE_TRANSFER up_thread="true" down_thread="true"/> </config> </attribute> <!-- Whether or not to fetch state on joining a cluster NOTE this used to be called FetchStateOnStartup and has been renamed to be more descriptive. --> <attribute name="FetchInMemoryState">true</attribute> <!-- The max amount of time (in milliseconds) we wait until the initial state (ie. the contents of the cache) are retrieved from existing members in a clustered environment --> <attribute name="InitialStateRetrievalTimeout">15000</attribute> <!-- Number of milliseconds to wait until all responses for a synchronous call have been received. --> <attribute name="SyncReplTimeout">15000</attribute> <!-- Max number of milliseconds to wait for a lock acquisition --> <attribute name="LockAcquisitionTimeout">10000</attribute> <!-- Name of the eviction policy class. --> <attribute name="EvictionPolicyClass"></attribute> <!-- Indicate whether to use region based marshalling or not. Set this to true if you are running under a scoped class loader, e.g., inside an application server. Default is "false". --> <attribute name="UseRegionBasedMarshalling">true</attribute> </mbean> </server>