Repeatable Reader - are readlocks actually obtained?
skaffman Jun 17, 2006 11:39 AMI've written a unit test which seems to demonstrate that the Repeatable Read semantics are not being followed for PojoCache under 1.4.0.BETA2. Even though the test uses DummyTransactionManager, I understand that the locking semantics should still be correct.
In summary, I create two threads, each with their own PojoCache instance, but in the same cluster. The 1st thread reads a value from the cache within a transaction, the 2nd thread then writes a new value under that FQN, and the 1st thread then re-reads the value (within the original transaction), and gets a non-repeatable read, i.e. the value as written by thread 2.
When I read the source code for PojoCache, I see that explicit locks are acquired for a putObject() or removeObject(), but not for getObject().
Is this a flaw, or am I mis-understanding the applicability of isolation level within this context? It doesn't seem to follow ACID semantics.
Anyway, here's the test case, plus the cache config file:
package test; import java.io.InputStream; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.cache.PropertyConfigurator; import org.jboss.cache.aop.PojoCache; public class RepeatableReadTest extends TestCase { private final CountDownLatch startSignal = new CountDownLatch(1); private final CountDownLatch doneSignal = new CountDownLatch(2); private final CountDownLatch signalA = new CountDownLatch(1); private final CountDownLatch signalB = new CountDownLatch(1); private final ExecutorService executor = Executors.newCachedThreadPool(); @Override protected void tearDown() throws Exception { executor.shutdownNow(); } public void test() throws Exception { Log log = LogFactory.getLog(getClass()); log.info("Creating and starting worker threads"); executor.submit(new Worker1()); executor.submit(new Worker2()); log.info("Seeding cache with initial value for /test"); PojoCache cache = createCache(); cache.putObject("/test", "value1"); startSignal.countDown(); doneSignal.await(); } class Worker1 implements Callable { private final PojoCache cache = createCache(); private Log log = LogFactory.getLog(getClass()); public Object call() throws Exception { startSignal.await(); log.info("Beginning new tx and reading value"); cache.getTransactionManager().begin(); log.info("Cache contains value for /test : " + cache.getObject("/test")); log.info("Sending signal to thread 2"); signalA.countDown(); log.info("Waiting for signal from thread 2"); signalB.await(); log.info("Cache contains value for /test : " + cache.getObject("/test")); log.info("Done"); doneSignal.countDown(); return null; } } class Worker2 implements Callable { private final PojoCache cache = createCache(); private Log log = LogFactory.getLog(getClass()); public Object call() throws Exception { startSignal.await(); log.info("Waiting for signal from thread 1"); signalA.await(); log.info("Starting new tx and writing new value for /test"); cache.getTransactionManager().begin(); cache.putObject("/test", "value2"); log.info("Committing"); cache.getTransactionManager().commit(); log.info("Sending signal to thread 1"); signalB.countDown(); log.info("Done"); doneSignal.countDown(); return null; } } private PojoCache createCache() throws RuntimeException { try { PojoCache cache = new PojoCache(); InputStream configStream = getClass().getResourceAsStream("cache-service.xml"); new PropertyConfigurator().configure(cache, configStream); cache.setClusterName(getName()); cache.start(); return cache; } catch (Exception ex) { throw new RuntimeException(ex); } } }
And now the config:
<?xml version="1.0" encoding="UTF-8"?> <!-- ===================================================================== --> <!-- --> <!-- Sample TreeCache Service Configuration --> <!-- --> <!-- ===================================================================== --> <server> <!-- ==================================================================== --> <!-- Defines TreeCache configuration --> <!-- ==================================================================== --> <mbean code="org.jboss.cache.aop.PojoCache" name="test:service=PojoCache"> <depends>jboss:service=Naming</depends> <depends>jboss:service=TransactionManager</depends> <!-- Configure the TransactionManager --> <attribute name="TransactionManagerLookupClass">org.jboss.cache.GenericTransactionManagerLookup</attribute> <!-- Isolation level : SERIALIZABLE REPEATABLE_READ (default) READ_COMMITTED READ_UNCOMMITTED NONE --> <attribute name="IsolationLevel">REPEATABLE_READ</attribute> <!-- Valid modes are LOCAL REPL_ASYNC REPL_SYNC INVALIDATION_ASYNC INVALIDATION_SYNC --> <attribute name="CacheMode">REPL_SYNC</attribute> <!-- Just used for async repl: use a replication queue --> <attribute name="UseReplQueue">false</attribute> <!-- Replication interval for replication queue (in ms) --> <attribute name="ReplQueueInterval">0</attribute> <!-- Max number of elements which trigger replication --> <attribute name="ReplQueueMaxElements">0</attribute> <!-- Name of cluster. Needs to be the same for all clusters, in order to find each other --> <attribute name="ClusterName">TestCache</attribute> <!-- JGroups protocol stack properties. Can also be a URL, e.g. file:/home/bela/default.xml <attribute name="ClusterProperties"></attribute> --> <attribute name="ClusterConfig"> <config> <!-- UDP: if you have a multihomed machine, set the bind_addr attribute to the appropriate NIC IP address, e.g bind_addr="192.168.0.2" --> <!-- UDP: On Windows machines, because of the media sense feature being broken with multicast (even after disabling media sense) set the loopback attribute to true --> <UDP mcast_addr="228.1.2.3" mcast_port="48866" ip_ttl="64" ip_mcast="true" mcast_send_buf_size="150000" mcast_recv_buf_size="80000" ucast_send_buf_size="150000" ucast_recv_buf_size="80000" loopback="false"/> <PING timeout="2000" num_initial_members="3" up_thread="false" down_thread="false"/> <MERGE2 min_interval="10000" max_interval="20000"/> <!-- <FD shun="true" up_thread="true" down_thread="true" />--> <FD_SOCK/> <VERIFY_SUSPECT timeout="1500" up_thread="false" down_thread="false"/> <pbcast.NAKACK gc_lag="50" retransmit_timeout="600,1200,2400,4800" max_xmit_size="8192" up_thread="false" down_thread="false"/> <UNICAST timeout="600,1200,2400" window_size="100" min_threshold="10" down_thread="false"/> <pbcast.STABLE desired_avg_gossip="20000" up_thread="false" down_thread="false"/> <FRAG frag_size="8192" down_thread="false" up_thread="false"/> <pbcast.GMS join_timeout="5000" join_retry_timeout="2000" shun="true" print_local_addr="true"/> <pbcast.STATE_TRANSFER up_thread="true" down_thread="true"/> </config> </attribute> <!-- Whether or not to fetch state on joining a cluster NOTE this used to be called FetchStateOnStartup and has been renamed to be more descriptive. --> <attribute name="FetchInMemoryState">true</attribute> <!-- The max amount of time (in milliseconds) we wait until the initial state (ie. the contents of the cache) are retrieved from existing members in a clustered environment --> <attribute name="InitialStateRetrievalTimeout">15000</attribute> <!-- Number of milliseconds to wait until all responses for a synchronous call have been received. --> <attribute name="SyncReplTimeout">15000</attribute> <!-- Max number of milliseconds to wait for a lock acquisition --> <attribute name="LockAcquisitionTimeout">10000</attribute> <!-- Name of the eviction policy class. --> <attribute name="EvictionPolicyClass"></attribute> <!-- Indicate whether to use region based marshalling or not. Set this to true if you are running under a scoped class loader, e.g., inside an application server. Default is "false". --> <attribute name="UseRegionBasedMarshalling">true</attribute> </mbean> </server>