5 Replies Latest reply on Jun 29, 2006 3:34 AM by Ben Wang

    Repeatable Reader - are readlocks actually obtained?

    Kenny MacLeod Newbie

      I've written a unit test which seems to demonstrate that the Repeatable Read semantics are not being followed for PojoCache under 1.4.0.BETA2. Even though the test uses DummyTransactionManager, I understand that the locking semantics should still be correct.

      In summary, I create two threads, each with their own PojoCache instance, but in the same cluster. The 1st thread reads a value from the cache within a transaction, the 2nd thread then writes a new value under that FQN, and the 1st thread then re-reads the value (within the original transaction), and gets a non-repeatable read, i.e. the value as written by thread 2.

      When I read the source code for PojoCache, I see that explicit locks are acquired for a putObject() or removeObject(), but not for getObject().

      Is this a flaw, or am I mis-understanding the applicability of isolation level within this context? It doesn't seem to follow ACID semantics.

      Anyway, here's the test case, plus the cache config file:

      package test;
      
      import java.io.InputStream;
      import java.util.concurrent.Callable;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      import junit.framework.TestCase;
      
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.jboss.cache.PropertyConfigurator;
      import org.jboss.cache.aop.PojoCache;
      
      public class RepeatableReadTest extends TestCase {
      
       private final CountDownLatch startSignal = new CountDownLatch(1);
       private final CountDownLatch doneSignal = new CountDownLatch(2);
      
       private final CountDownLatch signalA = new CountDownLatch(1);
       private final CountDownLatch signalB = new CountDownLatch(1);
      
       private final ExecutorService executor = Executors.newCachedThreadPool();
      
       @Override
       protected void tearDown() throws Exception {
       executor.shutdownNow();
       }
      
       public void test() throws Exception {
       Log log = LogFactory.getLog(getClass());
      
       log.info("Creating and starting worker threads");
       executor.submit(new Worker1());
       executor.submit(new Worker2());
      
       log.info("Seeding cache with initial value for /test");
       PojoCache cache = createCache();
       cache.putObject("/test", "value1");
      
       startSignal.countDown();
       doneSignal.await();
       }
      
       class Worker1 implements Callable {
       private final PojoCache cache = createCache();
       private Log log = LogFactory.getLog(getClass());
      
       public Object call() throws Exception {
       startSignal.await();
      
       log.info("Beginning new tx and reading value");
       cache.getTransactionManager().begin();
       log.info("Cache contains value for /test : " + cache.getObject("/test"));
      
       log.info("Sending signal to thread 2");
       signalA.countDown();
      
       log.info("Waiting for signal from thread 2");
       signalB.await();
      
       log.info("Cache contains value for /test : " + cache.getObject("/test"));
      
       log.info("Done");
       doneSignal.countDown();
      
       return null;
       }
       }
      
       class Worker2 implements Callable {
       private final PojoCache cache = createCache();
       private Log log = LogFactory.getLog(getClass());
      
       public Object call() throws Exception {
       startSignal.await();
      
       log.info("Waiting for signal from thread 1");
       signalA.await();
      
       log.info("Starting new tx and writing new value for /test");
       cache.getTransactionManager().begin();
       cache.putObject("/test", "value2");
      
       log.info("Committing");
       cache.getTransactionManager().commit();
      
       log.info("Sending signal to thread 1");
       signalB.countDown();
      
       log.info("Done");
       doneSignal.countDown();
      
       return null;
       }
       }
      
       private PojoCache createCache() throws RuntimeException {
       try {
       PojoCache cache = new PojoCache();
       InputStream configStream = getClass().getResourceAsStream("cache-service.xml");
       new PropertyConfigurator().configure(cache, configStream);
      
       cache.setClusterName(getName());
      
       cache.start();
       return cache;
       } catch (Exception ex) {
       throw new RuntimeException(ex);
       }
       }
      }
      


      And now the config:

      <?xml version="1.0" encoding="UTF-8"?>
      
      <!-- ===================================================================== -->
      <!-- -->
      <!-- Sample TreeCache Service Configuration -->
      <!-- -->
      <!-- ===================================================================== -->
      
      <server>
      
       <!-- ==================================================================== -->
       <!-- Defines TreeCache configuration -->
       <!-- ==================================================================== -->
      
       <mbean code="org.jboss.cache.aop.PojoCache"
       name="test:service=PojoCache">
      
       <depends>jboss:service=Naming</depends>
       <depends>jboss:service=TransactionManager</depends>
      
       <!--
       Configure the TransactionManager
       -->
       <attribute name="TransactionManagerLookupClass">org.jboss.cache.GenericTransactionManagerLookup</attribute>
      
       <!--
       Isolation level : SERIALIZABLE
       REPEATABLE_READ (default)
       READ_COMMITTED
       READ_UNCOMMITTED
       NONE
       -->
       <attribute name="IsolationLevel">REPEATABLE_READ</attribute>
      
       <!--
       Valid modes are LOCAL
       REPL_ASYNC
       REPL_SYNC
       INVALIDATION_ASYNC
       INVALIDATION_SYNC
       -->
       <attribute name="CacheMode">REPL_SYNC</attribute>
      
       <!--
       Just used for async repl: use a replication queue
       -->
       <attribute name="UseReplQueue">false</attribute>
      
       <!--
       Replication interval for replication queue (in ms)
       -->
       <attribute name="ReplQueueInterval">0</attribute>
      
       <!--
       Max number of elements which trigger replication
       -->
       <attribute name="ReplQueueMaxElements">0</attribute>
      
       <!-- Name of cluster. Needs to be the same for all clusters, in order
       to find each other
       -->
       <attribute name="ClusterName">TestCache</attribute>
      
       <!-- JGroups protocol stack properties. Can also be a URL,
       e.g. file:/home/bela/default.xml
       <attribute name="ClusterProperties"></attribute>
       -->
      
       <attribute name="ClusterConfig">
       <config>
       <!-- UDP: if you have a multihomed machine,
       set the bind_addr attribute to the appropriate NIC IP address, e.g bind_addr="192.168.0.2"
       -->
       <!-- UDP: On Windows machines, because of the media sense feature
       being broken with multicast (even after disabling media sense)
       set the loopback attribute to true -->
       <UDP mcast_addr="228.1.2.3" mcast_port="48866"
       ip_ttl="64" ip_mcast="true"
       mcast_send_buf_size="150000" mcast_recv_buf_size="80000"
       ucast_send_buf_size="150000" ucast_recv_buf_size="80000"
       loopback="false"/>
       <PING timeout="2000" num_initial_members="3"
       up_thread="false" down_thread="false"/>
       <MERGE2 min_interval="10000" max_interval="20000"/>
       <!-- <FD shun="true" up_thread="true" down_thread="true" />-->
       <FD_SOCK/>
       <VERIFY_SUSPECT timeout="1500"
       up_thread="false" down_thread="false"/>
       <pbcast.NAKACK gc_lag="50" retransmit_timeout="600,1200,2400,4800"
       max_xmit_size="8192" up_thread="false" down_thread="false"/>
       <UNICAST timeout="600,1200,2400" window_size="100" min_threshold="10"
       down_thread="false"/>
       <pbcast.STABLE desired_avg_gossip="20000"
       up_thread="false" down_thread="false"/>
       <FRAG frag_size="8192"
       down_thread="false" up_thread="false"/>
       <pbcast.GMS join_timeout="5000" join_retry_timeout="2000"
       shun="true" print_local_addr="true"/>
       <pbcast.STATE_TRANSFER up_thread="true" down_thread="true"/>
       </config>
       </attribute>
      
      
       <!--
       Whether or not to fetch state on joining a cluster
       NOTE this used to be called FetchStateOnStartup and has been renamed to be more descriptive.
       -->
       <attribute name="FetchInMemoryState">true</attribute>
      
       <!--
       The max amount of time (in milliseconds) we wait until the
       initial state (ie. the contents of the cache) are retrieved from
       existing members in a clustered environment
       -->
       <attribute name="InitialStateRetrievalTimeout">15000</attribute>
      
       <!--
       Number of milliseconds to wait until all responses for a
       synchronous call have been received.
       -->
       <attribute name="SyncReplTimeout">15000</attribute>
      
       <!-- Max number of milliseconds to wait for a lock acquisition -->
       <attribute name="LockAcquisitionTimeout">10000</attribute>
      
       <!-- Name of the eviction policy class. -->
       <attribute name="EvictionPolicyClass"></attribute>
      
       <!--
       Indicate whether to use region based marshalling or not. Set this to true if you are running under a scoped
       class loader, e.g., inside an application server. Default is "false".
       -->
       <attribute name="UseRegionBasedMarshalling">true</attribute>
       </mbean>
      </server>