3 Replies Latest reply on Oct 16, 2009 9:55 AM by mlohbihler

    Cache locking problem

    mlohbihler

      Hi,

      I can't seem to find any post about this, so i'm sure i'm doing something wrong. If anyone can tell me what, it would be greatly appreciated.

      Even with REPEATABLE_READ and writeSkewCheck="true" i can easily determine that data versioning isn't working. I'm using 3.2.1GA with the following configuration (copied from the samples with only locking values changed):

      <locking
       isolationLevel="REPEATABLE_READ"
       lockParentForChildInsertRemove="false"
       lockAcquisitionTimeout="20000"
       nodeLockingScheme="mvcc"
       writeSkewCheck="true"
       concurrencyLevel="500"/>
      
       <!-- Configure the TransactionManager -->
       <transaction transactionManagerLookupClass="org.jboss.cache.transaction.GenericTransactionManagerLookup"/>
      
       <clustering mode="replication">
       <!-- JGroups protocol stack properties. -->
       <jgroupsConfig>
       <UDP discard_incompatible_packets="true" enable_bundling="false" enable_diagnostics="false" ip_ttl="2"
       loopback="false" max_bundle_size="64000" max_bundle_timeout="30" mcast_addr="228.10.10.10"
       mcast_port="45588" mcast_recv_buf_size="25000000" mcast_send_buf_size="640000"
       oob_thread_pool.enabled="true" oob_thread_pool.keep_alive_time="10000" oob_thread_pool.max_threads="4"
       oob_thread_pool.min_threads="1" oob_thread_pool.queue_enabled="true" oob_thread_pool.queue_max_size="10"
       oob_thread_pool.rejection_policy="Run" thread_naming_pattern="pl" thread_pool.enabled="true"
       thread_pool.keep_alive_time="30000" thread_pool.max_threads="25" thread_pool.min_threads="1"
       thread_pool.queue_enabled="true" thread_pool.queue_max_size="10" thread_pool.rejection_policy="Run"
       tos="8" ucast_recv_buf_size="20000000" ucast_send_buf_size="640000" use_concurrent_stack="true"
       use_incoming_packet_handler="true"/>
       <PING num_initial_members="3" timeout="2000"/>
       <MERGE2 max_interval="30000" min_interval="10000"/>
       <FD_SOCK/>
       <FD max_tries="5" shun="true" timeout="10000"/>
       <VERIFY_SUSPECT timeout="1500"/>
       <pbcast.NAKACK discard_delivered_msgs="true" gc_lag="0" retransmit_timeout="300,600,1200,2400,4800"
       use_mcast_xmit="false"/>
       <UNICAST timeout="300,600,1200,2400,3600"/>
       <pbcast.STABLE desired_avg_gossip="50000" max_bytes="400000" stability_delay="1000"/>
       <pbcast.GMS join_timeout="5000" print_local_addr="true" shun="false" view_ack_collection_timeout="5000"
       view_bundling="true"/>
       <FRAG2 frag_size="60000"/>
       <pbcast.STREAMING_STATE_TRANSFER/>
       <pbcast.FLUSH timeout="0"/>
      
       </jgroupsConfig>
      
       <sync />
       <!-- Alternatively, to use async replication, comment out the element above and uncomment the element below. -->
       <!-- <async /> -->
      
       </clustering>
      


      This is the test code:

      public static void main(String[] args) throws Exception {
       CacheFactory<String, Integer> factory = new DefaultCacheFactory<String, Integer>();
       Cache<String, Integer> cache = factory.createCache("conf/cache.xml");
      
       Node<String, Integer> root = cache.getRoot();
       Fqn<String> test1Fqn = Fqn.fromElements("test", "test-1");
       Node<String, Integer> test1Counts = root.addChild(test1Fqn);
      
       for (int i=0; i<10000; i++)
       updateCount(test1Counts, "key1");
      
       boolean run = true;
       while (run) {
       System.out.println("Members: "+ cache.getMembers().size());
       System.out.println("Sum: "+ test1Counts.get("key1"));
       Thread.sleep(4000);
       }
      
       cache.stop();
       cache.destroy();
       }
      
       private static void updateCount(Node<String, Integer> map, String key) {
       while (true) {
       try {
       Integer count = map.get(key);
       if (count == null)
       count = 0;
       count++;
       while (true) {
       try {
       map.put(key, count);
       break;
       }
       catch (TimeoutException e) {
       System.out.println(e.getMessage());
       }
       catch (SuspectException e) {
       System.out.println(e.getMessage());
       }
       }
       break;
       }
       catch (DataVersioningException e) {
       // Retry
       System.out.println(e.getMessage());
       }
       }
      
       System.out.println("Updated map: "+ map.getData());
       }
      


      I run two instances on the same machine and set a breakpoint at the "map.get" method. I step past the get in both instances, and then let the first instance run. Then i step in the second instance, expecting the map.put to fail, but it doesn't. I know the cluster works because the get will read the latest value put by either instance.

      The funny thing is that, when i remove the breakpoint and let both instances run, i will occasionally get a DataVersioningException, which suggests that i did something right, but not enough. Typically, in the end the total in the counter will be around 15K, when it should be exactly 20K.

      Can anyone help me out?

      Kind regards,
      m@


        • 1. Re: Cache locking problem
          mlohbihler

          Ok, i've tried a number of different things now including using the configuration from Manik's demo, but nothing fixes the problem. The only thing i haven't tried is wrapping my put into a transaction, but since i can't find any code or documentation that suggests this is necessary, i'm saving it for my glorious last effort.

          I've simplified the code that i can using to test. As before, using debugging it's easy to see that data versioning exceptions are not being thrown as they should. When i just let it run, the counter only gets incremented to around 2000, when it should reach exactly 3000.

          public static void main(String[] args) throws Exception {
           Cache<String, Integer> cache = new DefaultCacheFactory<String, Integer>().createCache("conf/cache.xml");
           Node<String, Integer> root = cache.getRoot();
          
           // Wait for other members.
           System.out.println("Waiting for other members...");
           while (cache.getMembers().size() < 3)
           Thread.sleep(50);
          
           System.out.println("Running...");
          
           int versioningFaults = 0;
           for (int i=0; i<1000; i++) {
           // Modify the cache
           while (true) {
           try {
           Integer count = root.get("key");
           if (count == null)
           count = 0;
           count++;
           while (true) {
           try {
           root.put("key", count);
           break;
           }
           catch (SuspectException e) {}
           catch (TimeoutException e) {}
           }
           break;
           }
           catch (DataVersioningException e) {
           versioningFaults++;
           }
           }
           }
          
           System.out.println("Final count: "+ root.get("key") +", faults: "+ versioningFaults);
          
           cache.stop();
           cache.destroy();
           }
          


          As before, any help, hints, and at this point even flames, etc are greatly appreciated.

          Regards,
          m@


          • 2. Re: Cache locking problem
            mlohbihler

            A glorious end. The transactional stuff solved the problem.

            Well, mostly... The evolved code is below. But even with transactions doing their duty i still noticed gaps in the data. I had to put sleeps into the loops because when all threads run at full speed a literal clusterf*** ensues... rollback and timeout exceptions go flying everywhere.

            But with the delays in there, things calm down nicely. Even still, although the results are reassuring, they are not exact. The below code should end up with a final count of 1000, but instead twice it ended with 999. I assumed it was a bug, but in the output i noticed that this message was written twice: "Put 91". This means something slipped between the cluster cracks somewhere.

            I don't need to do updates as quickly as i coded for in these tests - i was just testing the boundaries. And indeed, these results are good enough for my present purposes. Overall i think it's great stuff, but it would seem that there may still be small holes in the MVCC code.

            public static void main(String[] args) throws Exception {
             Cache<String, Integer> cache = new DefaultCacheFactory<String, Integer>().createCache("conf/cache.xml");
            
             // Wait for other members.
             System.out.println("Waiting for other members...");
             while (cache.getMembers().size() < 4)
             Thread.sleep(50);
            
             System.out.println("Running...");
            
             TransactionManager txm = new DummyTransactionManagerLookup().getTransactionManager();
            
             Random random = new Random();
             int versioningFaults = 0;
             int rollbackFaults = 0;
             int suspectFaults = 0;
             int timeoutFaults = 0;
             for (int i=0; i<50; i++) {
             Thread.sleep(random.nextInt(2000));
            
             // Modify the cache
             while (true) {
             try {
             txm.begin();
            
             Integer count = cache.getRoot().get("key");
             if (count == null)
             count = 0;
             count++;
            
             while (true) {
             try {
             cache.getRoot().put("key", count);
             System.out.println("Put "+ count);
             break;
             }
             catch (SuspectException e) {
             suspectFaults++;
             }
             catch (TimeoutException e) {
             timeoutFaults++;
             }
             }
            
             txm.commit();
             break;
             }
             catch (DataVersioningException e) {
             txm.rollback();
             versioningFaults++;
             }
             catch (RollbackException e) {
             rollbackFaults++;
             }
             }
             }
            
             System.out.println("Final count: "+ cache.getRoot().get("key"));
             System.out.println("Versioning faults: "+ versioningFaults);
             System.out.println("Rollback faults: "+ rollbackFaults);
             System.out.println("Suspect faults: "+ suspectFaults);
             System.out.println("Timeout faults: "+ timeoutFaults);
            
             cache.stop();
             cache.destroy();
             }
            



            • 3. Re: Cache locking problem
              mlohbihler

              Sigh. My apologies. I forgot that i had switched to READ_COMMITTED, so these gaps are just write skew. Switching back to REPEATABLE_READ results in exact counts as expected.

              Anyway, i hope at least the code samples will help other new users out.