5 Replies Latest reply on Jun 29, 2006 3:34 AM by ben.wang

    Repeatable Reader - are readlocks actually obtained?

    skaffman

      I've written a unit test which seems to demonstrate that the Repeatable Read semantics are not being followed for PojoCache under 1.4.0.BETA2. Even though the test uses DummyTransactionManager, I understand that the locking semantics should still be correct.

      In summary, I create two threads, each with their own PojoCache instance, but in the same cluster. The 1st thread reads a value from the cache within a transaction, the 2nd thread then writes a new value under that FQN, and the 1st thread then re-reads the value (within the original transaction), and gets a non-repeatable read, i.e. the value as written by thread 2.

      When I read the source code for PojoCache, I see that explicit locks are acquired for a putObject() or removeObject(), but not for getObject().

      Is this a flaw, or am I mis-understanding the applicability of isolation level within this context? It doesn't seem to follow ACID semantics.

      Anyway, here's the test case, plus the cache config file:

      package test;
      
      import java.io.InputStream;
      import java.util.concurrent.Callable;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      import junit.framework.TestCase;
      
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.jboss.cache.PropertyConfigurator;
      import org.jboss.cache.aop.PojoCache;
      
      public class RepeatableReadTest extends TestCase {
      
       private final CountDownLatch startSignal = new CountDownLatch(1);
       private final CountDownLatch doneSignal = new CountDownLatch(2);
      
       private final CountDownLatch signalA = new CountDownLatch(1);
       private final CountDownLatch signalB = new CountDownLatch(1);
      
       private final ExecutorService executor = Executors.newCachedThreadPool();
      
       @Override
       protected void tearDown() throws Exception {
       executor.shutdownNow();
       }
      
       public void test() throws Exception {
       Log log = LogFactory.getLog(getClass());
      
       log.info("Creating and starting worker threads");
       executor.submit(new Worker1());
       executor.submit(new Worker2());
      
       log.info("Seeding cache with initial value for /test");
       PojoCache cache = createCache();
       cache.putObject("/test", "value1");
      
       startSignal.countDown();
       doneSignal.await();
       }
      
       class Worker1 implements Callable {
       private final PojoCache cache = createCache();
       private Log log = LogFactory.getLog(getClass());
      
       public Object call() throws Exception {
       startSignal.await();
      
       log.info("Beginning new tx and reading value");
       cache.getTransactionManager().begin();
       log.info("Cache contains value for /test : " + cache.getObject("/test"));
      
       log.info("Sending signal to thread 2");
       signalA.countDown();
      
       log.info("Waiting for signal from thread 2");
       signalB.await();
      
       log.info("Cache contains value for /test : " + cache.getObject("/test"));
      
       log.info("Done");
       doneSignal.countDown();
      
       return null;
       }
       }
      
       class Worker2 implements Callable {
       private final PojoCache cache = createCache();
       private Log log = LogFactory.getLog(getClass());
      
       public Object call() throws Exception {
       startSignal.await();
      
       log.info("Waiting for signal from thread 1");
       signalA.await();
      
       log.info("Starting new tx and writing new value for /test");
       cache.getTransactionManager().begin();
       cache.putObject("/test", "value2");
      
       log.info("Committing");
       cache.getTransactionManager().commit();
      
       log.info("Sending signal to thread 1");
       signalB.countDown();
      
       log.info("Done");
       doneSignal.countDown();
      
       return null;
       }
       }
      
       private PojoCache createCache() throws RuntimeException {
       try {
       PojoCache cache = new PojoCache();
       InputStream configStream = getClass().getResourceAsStream("cache-service.xml");
       new PropertyConfigurator().configure(cache, configStream);
      
       cache.setClusterName(getName());
      
       cache.start();
       return cache;
       } catch (Exception ex) {
       throw new RuntimeException(ex);
       }
       }
      }
      


      And now the config:

      <?xml version="1.0" encoding="UTF-8"?>
      
      <!-- ===================================================================== -->
      <!-- -->
      <!-- Sample TreeCache Service Configuration -->
      <!-- -->
      <!-- ===================================================================== -->
      
      <server>
      
       <!-- ==================================================================== -->
       <!-- Defines TreeCache configuration -->
       <!-- ==================================================================== -->
      
       <mbean code="org.jboss.cache.aop.PojoCache"
       name="test:service=PojoCache">
      
       <depends>jboss:service=Naming</depends>
       <depends>jboss:service=TransactionManager</depends>
      
       <!--
       Configure the TransactionManager
       -->
       <attribute name="TransactionManagerLookupClass">org.jboss.cache.GenericTransactionManagerLookup</attribute>
      
       <!--
       Isolation level : SERIALIZABLE
       REPEATABLE_READ (default)
       READ_COMMITTED
       READ_UNCOMMITTED
       NONE
       -->
       <attribute name="IsolationLevel">REPEATABLE_READ</attribute>
      
       <!--
       Valid modes are LOCAL
       REPL_ASYNC
       REPL_SYNC
       INVALIDATION_ASYNC
       INVALIDATION_SYNC
       -->
       <attribute name="CacheMode">REPL_SYNC</attribute>
      
       <!--
       Just used for async repl: use a replication queue
       -->
       <attribute name="UseReplQueue">false</attribute>
      
       <!--
       Replication interval for replication queue (in ms)
       -->
       <attribute name="ReplQueueInterval">0</attribute>
      
       <!--
       Max number of elements which trigger replication
       -->
       <attribute name="ReplQueueMaxElements">0</attribute>
      
       <!-- Name of cluster. Needs to be the same for all clusters, in order
       to find each other
       -->
       <attribute name="ClusterName">TestCache</attribute>
      
       <!-- JGroups protocol stack properties. Can also be a URL,
       e.g. file:/home/bela/default.xml
       <attribute name="ClusterProperties"></attribute>
       -->
      
       <attribute name="ClusterConfig">
       <config>
       <!-- UDP: if you have a multihomed machine,
       set the bind_addr attribute to the appropriate NIC IP address, e.g bind_addr="192.168.0.2"
       -->
       <!-- UDP: On Windows machines, because of the media sense feature
       being broken with multicast (even after disabling media sense)
       set the loopback attribute to true -->
       <UDP mcast_addr="228.1.2.3" mcast_port="48866"
       ip_ttl="64" ip_mcast="true"
       mcast_send_buf_size="150000" mcast_recv_buf_size="80000"
       ucast_send_buf_size="150000" ucast_recv_buf_size="80000"
       loopback="false"/>
       <PING timeout="2000" num_initial_members="3"
       up_thread="false" down_thread="false"/>
       <MERGE2 min_interval="10000" max_interval="20000"/>
       <!-- <FD shun="true" up_thread="true" down_thread="true" />-->
       <FD_SOCK/>
       <VERIFY_SUSPECT timeout="1500"
       up_thread="false" down_thread="false"/>
       <pbcast.NAKACK gc_lag="50" retransmit_timeout="600,1200,2400,4800"
       max_xmit_size="8192" up_thread="false" down_thread="false"/>
       <UNICAST timeout="600,1200,2400" window_size="100" min_threshold="10"
       down_thread="false"/>
       <pbcast.STABLE desired_avg_gossip="20000"
       up_thread="false" down_thread="false"/>
       <FRAG frag_size="8192"
       down_thread="false" up_thread="false"/>
       <pbcast.GMS join_timeout="5000" join_retry_timeout="2000"
       shun="true" print_local_addr="true"/>
       <pbcast.STATE_TRANSFER up_thread="true" down_thread="true"/>
       </config>
       </attribute>
      
      
       <!--
       Whether or not to fetch state on joining a cluster
       NOTE this used to be called FetchStateOnStartup and has been renamed to be more descriptive.
       -->
       <attribute name="FetchInMemoryState">true</attribute>
      
       <!--
       The max amount of time (in milliseconds) we wait until the
       initial state (ie. the contents of the cache) are retrieved from
       existing members in a clustered environment
       -->
       <attribute name="InitialStateRetrievalTimeout">15000</attribute>
      
       <!--
       Number of milliseconds to wait until all responses for a
       synchronous call have been received.
       -->
       <attribute name="SyncReplTimeout">15000</attribute>
      
       <!-- Max number of milliseconds to wait for a lock acquisition -->
       <attribute name="LockAcquisitionTimeout">10000</attribute>
      
       <!-- Name of the eviction policy class. -->
       <attribute name="EvictionPolicyClass"></attribute>
      
       <!--
       Indicate whether to use region based marshalling or not. Set this to true if you are running under a scoped
       class loader, e.g., inside an application server. Default is "false".
       -->
       <attribute name="UseRegionBasedMarshalling">true</attribute>
       </mbean>
      </server>
      


        • 1. Re: Repeatable Reader - are readlocks actually obtained?
          belaban

          You're misunderstanding is that you assume REP_READ across the cluster, which is not true. Locking always occurs locally, so your reads and writes have to happen within the *same* TreeCache

          • 2. Re: Repeatable Reader - are readlocks actually obtained?
            skaffman

            That's not what I wanted to hear, but thanks for clearing it up :)

            It raises the question of whether JBossCache is a suitable tool for what I'm trying to achieve, which is using it as a distributed, transactional datastore, rather than as a cache of data held elsewhere. This places demands on the system regarding absolute data integrity, and it's starting to look as though this is pushing JBossCache down roads that perhaps it wasn't designed for.

            Is this a fair observation, or should it be fine for this sort of work?

            • 3. Re: Repeatable Reader - are readlocks actually obtained?

              Well, two things in terms of data integrity for PojoCache.

              1) When you try to attach/detach/retrieve POJO, yes, you are correct, I only lock put/removeObject but not getObject. I have debated about this. In the end, I have decided not to because:

              a) The usage pattern for getObject is to retrieve the POJO referece. And most of the time, it will be just an get(fqn, AOPInstance.Key). So locking the whole tree doesn't do much semantically.

              b) The most usage for PojoCache should not be put/removeOjbect. Instead, it should be just POJO operations, e.g., pojo.setName(), etc. So in this sense, the node level REPEATABLE_READ semantics should still apply.

              • 4. Re: Repeatable Reader - are readlocks actually obtained?

                Is this the case in all current versions of PojoCache? 1.3.x and 1.4.x ?

                • 5. Re: Repeatable Reader - are readlocks actually obtained?

                  Yes, I believe getObject never obtains a lock from the root. Only the inidividual update will trigger the appropriate lock.