1 2 3 4 Previous Next 49 Replies Latest reply: Jan 21, 2010 10:04 AM by Galder ZamarreƱo RSS

    Transactions: atomicity OK, isolation KO

    ezza ezaezazea Novice

      I'm studying the feasibility of using JbossPOJOCache (henceforth JBPC) as a transactional cache for our application framework. I'm using a AtomikosEssentials transaction manager, and Spring as a declarative JTA container. JBPC is 3.0.0GA.

      To do so i wrote 2 unit tests, one for testing the A part of ACID and one for testing the I part of ACID.
      -The atomicity test executes a failing transaction and verifies it has been rollbacked.
      -The isolation test starts a thread that modifies a JBPC object; this update is artifically long (Thread.sleep at the end). It then verifies that a read is blocked until the update is over, and that the read returns the updated state


      The atomicity test passes, indicating that indeed i'm working inside a transaction.
      But the isolation test fails. The test per se seems correct because if i use a JDBC resource instead of the JBPC object as shared state, the test passes. Thus, there seems to be a problem with my JBPC configuration, or something else i don't understand.

      The unit test:

      @RunWith(SpringJUnit4ClassRunner.class)
      @ContextConfiguration ( locations={"/application-context_Jdbc_Atomikos.xml"} )
      public class ComportementTransactionnelTable implements ApplicationContextAware
      {
       @Resource ( name="tracksTable" )
       protected TracksTable table;
       private final ExecutorService executorService = Executors.newFixedThreadPool(1);
       protected static ApplicationContext applicationContext;
      
       @Before
       public void start ()
       {
       table.deleteAllTracks ();
       assertEquals ( 0 , table.size() );
       }
      
       @Test
       //@Ignore
       public void atomicite () throws Exception
       {
       //Apres saisie: 1 ligne
       table.create2Tracks ( false );
       assertEquals ( 2 , table.size() );
      
       try
       {
       table.create2Tracks ( true );
       fail ();
       }
       catch ( Exception e ) {}
       finally
       {
       assertEquals ( 2 , table.size() );
       }
       }
      
       @Test
       //@Ignore
       public void isolation () throws Exception
       {
       assertEquals ( 0 , table.size() );
       int dureeEnSecondes = 5; //Must be < lock acquire timeout
       executorService.submit ( new MiseAJourLongue ( dureeEnSecondes ) );
      
       StopWatch stopWatch = new StopWatch ();
       stopWatch.start();
       //Wait a little to be sure that by the time we get to table.size,
       //the MiseAJourLongue thread has had time to start its transaction
       ObjectUtils.sleep ( 100 );
       assertEquals ( 1 , table.size() );
       stopWatch.stop();
      
       double tempsBloque = stopWatch.getTotalTimeSeconds();
       System.out.println ( "isolation/tempsBloque: " + tempsBloque );
       assertTrue ( tempsBloque>= dureeEnSecondes );
       }
      
       private class MiseAJourLongue implements Runnable
       {
       private final int dureeEnSecondes;
      
       MiseAJourLongue ( int dureeEnSecondes )
       {
       this.dureeEnSecondes = dureeEnSecondes;
       }
      
       @Override
       public void run()
       {
       table.createTrackAndSleep ( dureeEnSecondes );
       }
       }
      
       @Override
       public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
       {
       TransactionManager tm = (TransactionManager) applicationContext.getBean ( "atomikosTransactionManager" );
       AtomikosTransactionManagerLookup.setAtomikosTransactionManager ( tm );
       }
      }
      

      It can be used for a shared state that is either JDBC or JBPC.

      When the shared state is JBPC one launches the JBPC-specific daughter class:
      public class ComportementTransactionnelTableCache extends ComportementTransactionnelTable
      {
       private static PojoCache cache;
      
       @Before
       public void start ()
       {
       cache = PojoCacheFactory.createCache ( "resources/META-INF/replSync-service.xml" , false );
      
       cache.getCache().getInvocationContext().getOptionOverrides().setForceWriteLock(true);
       cache.getCache().getInvocationContext().getOptionOverrides().setForceSynchronous(true);
       //cache.getCache().getConfiguration().setCacheMode(CacheMode.LOCAL);
       //cache.getCache().getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
       //cache.getCache().getConfiguration().setConcurrencyLevel(0);
       cache.getCache().getConfiguration().setLockParentForChildInsertRemove ( true );
       //cache.getCache().getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC);
       cache.getCache().getConfiguration().setSyncCommitPhase(true);
       cache.getCache().getConfiguration().setSyncRollbackPhase(true);
       cache.getCache().getConfiguration().setWriteSkewCheck(true);
      
       cache.start();
       table.setCache ( cache );
       super.start();
       }
      
       @After
       public void stop ()
       {
       cache.stop();
       table.unsetCache ( cache );
       }

      The JBPC-specific test is launched with options:
      -Dlog4j.configuration=file:///D:/ff/log4j.properties -Dcom.atomikos.icatch.file=D:/ff/jta.properties
      -Djboss.aop.verbose=false
      -Djboss.aop.path=src/resources/META-INF/pojocache-aop.xml
      -javaagent:D:/telechargements/depuisChezMoi/jbosscacheALL/jboss-aop.jar



      The transactional Spring bean(TracksTableImpl) delegates shared state either to a JDBC implementation (TracksTableJdbcDelegate) or to a JBPC implementation (TracksTableJbpcDelegate):
      @Service("tracksTable")
      @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000)
      public class TracksTableImpl implements TracksTable
      {
       private TracksTableDelegate delegate = new TracksTableJbpcDelegate ();
       /*@Resource ( name="tracksTableJdbcDelegate" )
       private TracksTableDelegate delegate;*/
      
       @Override
       public void createTrack ( boolean fail )
       {
       if ( fail ) throw new TrackException ();
       Track track = new TrackImpl ();
       delegate.createTrack ( track );
       }
      
       @Override
       public void create2Tracks ( boolean failSurLeDeuxieme )
       {
       createTrack ( false );
       createTrack ( failSurLeDeuxieme );
       }
      
       @Override
       public void createTrackAndSleep(int dureeEnSecondes)
       {
       createTrack ( false );
       ObjectUtils.sleep ( dureeEnSecondes * 1000 );
       }
      
       @Override
       public void deleteAllTracks()
       {
       delegate.deleteAllTracks();
       }
      
       @Override
       public int size()
       {
       return delegate.size();
       }
      
       @Override
       public void setCache ( PojoCache cache )
       {
       delegate.attach ( cache );
       }
      
       @Override
       public void unsetCache ( PojoCache cache )
       {
       delegate.detach ( cache );
       }
      }
      

      @Service ("tracksTableJdbcDelegate")
      public class TracksTableJdbcDelegate implements TracksTableDelegate
      {
       @Resource ( name="jdbcTemplate" )
       private SimpleJdbcOperations template;
      
       private static final String INSERT = "insert into TRACK (NOM) values (:NOM)";
       private static final String DELETE_ALL = "delete from TRACK";
       private static final String SELECT_ALL = "select count(*) from TRACK";
      
       @Override
       public void createTrack ( Track track )
       {
       System.out.println ( "createTrack/isActualTransactionActive: " + Spring.isActualTransactionActive() );
       Map<String,String> map = new HashMap<String,String> ();
       map.put ( "NOM" , "aaa" );
       template.update ( INSERT , map );
       }
      
       @Override
       public void deleteAllTracks()
       {
       template.update ( DELETE_ALL );
       }
      
       @Override
       public int size()
       {
       return template.queryForInt ( SELECT_ALL );
       }
      
       @Override
       public void attach(PojoCache cache)
       {
       //DO NOTHING
       }
      
       @Override
       public void detach(PojoCache cache)
       {
       //DO NOTHING
       }
      }
      

      @org.jboss.cache.pojo.annotation.Replicable
      public class TracksTableJbpcDelegate implements TracksTableDelegate
      {
       private List<Track> tracks = new ArrayList<Track> ();
      
       @Override
       public void createTrack ( Track track )
       {
       System.out.println ( "createTrack/isActualTransactionActive: " + Spring.isActualTransactionActive() );
       tracks.add ( track );
       }
      
       @Override
       public void deleteAllTracks()
       {
       tracks.clear();
       }
      
       @Override
       public int size()
       {
       System.out.println ( "size/isActualTransactionActive: " + Spring.isActualTransactionActive() );
       return tracks.size();
       }
      
       @Override
       public void attach ( PojoCache cache )
       {
       System.out.println ( "__________attach" );
       cache.attach ( "naja/tracksTable", this );
       }
      
       @Override
       public void detach ( PojoCache cache )
       {
       System.out.println ( "__________detach" );
       cache.detach ( "naja/tracksTable" );
       }
      }
      


        • 1. Re: Transactions: atomicity OK, isolation KO
          ezza ezaezazea Novice

          My replSync-service.xml:

          <?xml version="1.0" encoding="UTF-8"?>
          <server>
           <mbean code="org.jboss.cache.jmx.CacheJmxWrapper"
           name="jboss.cache:service=TreeCache">
          
           <attribute name="TransactionManagerLookupClass">hellotrackworld.impl.srv.AtomikosTransactionManagerLookup</attribute>
           <attribute name="IsolationLevel">SERIALIZABLE</attribute>
           <attribute name="CacheMode">LOCAL</attribute>
          
           [...The rest is default]
          
           </mbean>
          </server>
          


          I tried using the deprecated pessimistic locking scheme but got a ClassCastException. I read that the default MVCC doesn't support SERIALIZABLE so that may be an issue? Or does it have to do with the row/table lock scope (didn't find a way to set it)? Or something else?

          • 2. Re: Transactions: atomicity OK, isolation KO
            ezza ezaezazea Novice

            I have tried to set some parameters that sounded like they could solve my issue.

            Here are the parameters overrides that complete the parameters specified in replSync-service.xml:

            cache = PojoCacheFactory.createCache ( "resources/META-INF/replSync-service.xml" , false );
            
             cache.getCache().getInvocationContext().getOptionOverrides().setForceWriteLock(true);
             cache.getCache().getInvocationContext().getOptionOverrides().setForceSynchronous(true);
             cache.getCache().getConfiguration().setLockParentForChildInsertRemove ( true );
             cache.getCache().getConfiguration().setSyncCommitPhase(true);
             cache.getCache().getConfiguration().setSyncRollbackPhase(true);
             //cache.getCache().getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC);
             //cache.getCache().getConfiguration().setWriteSkewCheck(true);
             cache.start();

            With this set of parameters the isolation test fails, but there is no exception.

            Uncommenting the writeSkewCheck parameters yields the following exception:
            org.jboss.cache.pojo.PojoCacheException: detach failed /__JBossInternal__/naja/tracksTable/_ID_/a1az2b-4aubdg-fzfejsf2-1-fzfejsy6-7
             at org.jboss.cache.pojo.impl.PojoCacheImpl.detach(PojoCacheImpl.java:134)
             at org.jboss.cache.pojo.impl.AdvisedPojoHandler.remove(AdvisedPojoHandler.java:216)
             at org.jboss.cache.pojo.impl.PojoCacheDelegate.removeObject(PojoCacheDelegate.java:261)
             at org.jboss.cache.pojo.impl.PojoCacheImpl.detach(PojoCacheImpl.java:126)
             at org.jboss.cache.pojo.impl.PojoCacheImpl.detach(PojoCacheImpl.java:221)
             at org.jboss.cache.pojo.impl.PojoCacheImpl.detach(PojoCacheImpl.java:214)
             at hellotrackworld.impl.TracksTableJbpcDelegate.detach(TracksTableJbpcDelegate.java:48)
             at hellotrackworld.impl.TracksTableImpl.unsetCache(TracksTableImpl.java:73)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
             at java.lang.reflect.Method.invoke(Method.java:597)
             at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:310)
             at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
             at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
             at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
             at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
             at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
             at $Proxy13.unsetCache(Unknown Source)
             at hellotrackworld.test.ComportementTransactionnelTableCache.stop(ComportementTransactionnelTableCache.java:36)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
             at java.lang.reflect.Method.invoke(Method.java:597)
             at org.springframework.test.context.junit4.SpringMethodRoadie.runAfters(SpringMethodRoadie.java:297)
             at org.springframework.test.context.junit4.SpringMethodRoadie$RunBeforesThenTestThenAfters.run(SpringMethodRoadie.java:338)
             at org.springframework.test.context.junit4.SpringMethodRoadie.runWithRepetitions(SpringMethodRoadie.java:217)
             at org.springframework.test.context.junit4.SpringMethodRoadie.runTest(SpringMethodRoadie.java:197)
             at org.springframework.test.context.junit4.SpringMethodRoadie.run(SpringMethodRoadie.java:143)
             at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.invokeTestMethod(SpringJUnit4ClassRunner.java:142)
             at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
             at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
             at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
             at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
             at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
             at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:45)
             at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
             at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
             at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
             at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
             at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)
            Caused by: org.jboss.cache.optimistic.DataVersioningException: Detected write skew on Fqn [/__JBossInternal__/naja/tracksTable/_ID_/a1az2b-4aubdg-fzfejsf2-1-fzfejsy6-8]. Another process has changed the node since we last read it!
             at org.jboss.cache.mvcc.RepeatableReadNode.markForUpdate(RepeatableReadNode.java:68)
             at org.jboss.cache.mvcc.MVCCNodeHelper.wrapNodeForWriting(MVCCNodeHelper.java:206)
             at org.jboss.cache.interceptors.MVCCLockingInterceptor.handlePutKeyValueCommand(MVCCLockingInterceptor.java:98)
             at org.jboss.cache.interceptors.base.PrePostProcessingCommandInterceptor.visitPutKeyValueCommand(PrePostProcessingCommandInterceptor.java:88)
             at org.jboss.cache.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:100)
             at org.jboss.cache.interceptors.base.CommandInterceptor.invokeNextInterceptor(CommandInterceptor.java:116)
             at org.jboss.cache.interceptors.base.CommandInterceptor.handleDefault(CommandInterceptor.java:131)
             at org.jboss.cache.commands.AbstractVisitor.visitPutKeyValueCommand(AbstractVisitor.java:65)
             at org.jboss.cache.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:100)
             at org.jboss.cache.interceptors.base.CommandInterceptor.invokeNextInterceptor(CommandInterceptor.java:116)
             at org.jboss.cache.interceptors.TxInterceptor.attachGtxAndPassUpChain(TxInterceptor.java:284)
             at org.jboss.cache.interceptors.TxInterceptor.handleDefault(TxInterceptor.java:271)
             at org.jboss.cache.commands.AbstractVisitor.visitPutKeyValueCommand(AbstractVisitor.java:65)
             at org.jboss.cache.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:100)
             at org.jboss.cache.interceptors.base.CommandInterceptor.invokeNextInterceptor(CommandInterceptor.java:116)
             at org.jboss.cache.interceptors.CacheMgmtInterceptor.visitPutKeyValueCommand(CacheMgmtInterceptor.java:119)
             at org.jboss.cache.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:100)
             at org.jboss.cache.interceptors.base.CommandInterceptor.invokeNextInterceptor(CommandInterceptor.java:116)
             at org.jboss.cache.interceptors.InvocationContextInterceptor.handleAll(InvocationContextInterceptor.java:178)
             at org.jboss.cache.interceptors.InvocationContextInterceptor.visitPutKeyValueCommand(InvocationContextInterceptor.java:82)
             at org.jboss.cache.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:100)
             at org.jboss.cache.interceptors.InterceptorChain.invoke(InterceptorChain.java:265)
             at org.jboss.cache.invocation.CacheInvocationDelegate.put(CacheInvocationDelegate.java:560)
             at org.jboss.cache.pojo.impl.InternalHelper.lockPojo(InternalHelper.java:342)
             at org.jboss.cache.pojo.impl.PojoCacheDelegate.removeObject(PojoCacheDelegate.java:250)
             at org.jboss.cache.pojo.impl.PojoCacheImpl.detach(PojoCacheImpl.java:126)
             ... 40 more
            
            


            Uncommenting the nodeLockingSchemeparameters yields the following exception:

            org.jboss.cache.pojo.PojoCacheException: detach failed /naja/tracksTable
             at org.jboss.cache.pojo.impl.PojoCacheImpl.detach(PojoCacheImpl.java:134)
             at org.jboss.cache.pojo.impl.PojoCacheImpl.detach(PojoCacheImpl.java:221)
             at org.jboss.cache.pojo.impl.PojoCacheImpl.detach(PojoCacheImpl.java:214)
             at hellotrackworld.impl.TracksTableJbpcDelegate.detach(TracksTableJbpcDelegate.java:48)
             at hellotrackworld.impl.TracksTableImpl.unsetCache(TracksTableImpl.java:73)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
             at java.lang.reflect.Method.invoke(Method.java:597)
             at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:310)
             at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
             at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
             at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
             at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
             at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
             at $Proxy13.unsetCache(Unknown Source)
             at hellotrackworld.test.ComportementTransactionnelTableCache.stop(ComportementTransactionnelTableCache.java:36)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
             at java.lang.reflect.Method.invoke(Method.java:597)
             at org.springframework.test.context.junit4.SpringMethodRoadie.runAfters(SpringMethodRoadie.java:297)
             at org.springframework.test.context.junit4.SpringMethodRoadie$RunBeforesThenTestThenAfters.run(SpringMethodRoadie.java:338)
             at org.springframework.test.context.junit4.SpringMethodRoadie.runWithRepetitions(SpringMethodRoadie.java:217)
             at org.springframework.test.context.junit4.SpringMethodRoadie.runTest(SpringMethodRoadie.java:197)
             at org.springframework.test.context.junit4.SpringMethodRoadie.run(SpringMethodRoadie.java:143)
             at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.invokeTestMethod(SpringJUnit4ClassRunner.java:142)
             at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
             at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
             at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
             at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
             at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
             at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:45)
             at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
             at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
             at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
             at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
             at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)
            Caused by: java.lang.ClassCastException: org.jboss.cache.transaction.PessimisticTransactionContext cannot be cast to org.jboss.cache.transaction.MVCCTransactionContext
             at org.jboss.cache.invocation.MVCCInvocationContext.setTransactionContext(MVCCInvocationContext.java:49)
             at org.jboss.cache.interceptors.BaseTransactionalContextInterceptor.setTransactionalContext(BaseTransactionalContextInterceptor.java:80)
             at org.jboss.cache.interceptors.InvocationContextInterceptor.handleAll(InvocationContextInterceptor.java:150)
             at org.jboss.cache.interceptors.InvocationContextInterceptor.visitPutKeyValueCommand(InvocationContextInterceptor.java:82)
             at org.jboss.cache.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:100)
             at org.jboss.cache.interceptors.InterceptorChain.invoke(InterceptorChain.java:265)
             at org.jboss.cache.invocation.CacheInvocationDelegate.put(CacheInvocationDelegate.java:560)
             at org.jboss.cache.pojo.impl.InternalHelper.lockPojo(InternalHelper.java:342)
             at org.jboss.cache.pojo.impl.PojoCacheDelegate.removeObject(PojoCacheDelegate.java:221)
             at org.jboss.cache.pojo.impl.PojoCacheImpl.detach(PojoCacheImpl.java:126)
             ... 37 more
            
            


            • 3. Re: Transactions: atomicity OK, isolation KO
              ezza ezaezazea Novice

              It works! I specified the pessimistic mode in the xml instead of programmatically and the isolation test now passes:

              <attribute name="NodeLockingScheme">PESSIMISTIC</attribute>


              But i find it worrying that the pessimistic mode is deprecated and will be removed in future versions, as it is the semantics i need.
              So how to do the same thing with MVCC?

              • 4. Re: Transactions: atomicity OK, isolation KO
                ezza ezaezazea Novice

                Anybody? If this not really a POJO edition question i can post it in the JBossCache forum maybe?

                • 5. Re: Transactions: atomicity OK, isolation KO
                  Manik Surtani Master

                  Have you tried boiling this down to a more specific isolation test for JBoss Cache? E.g., something like:

                  cache.put(k, v1);
                  T1: cache.put(k, v2); sleep;
                  T2: assert cache.get(k) == v1 ?

                  We have a lot of tests of this sort in our test suite, I'd be very surprised if this is a real bug.

                  Also, what version of jbosscache-core do you use? I'd recommend you upgrade to the latest (3.2.0.GA), this is compatible with POJO Cache 3.0.0.

                  Cheers
                  Manik

                  • 6. Re: Transactions: atomicity OK, isolation KO
                    ezza ezaezazea Novice

                     

                    Have you tried boiling this down to a more specific isolation test for JBoss Cache? E.g., something like:

                    cache.put(k, v1);
                    T1: cache.put(k, v2); sleep;
                    T2: assert cache.get(k) == v1 ?

                    We have a lot of tests of this sort in our test suite, I'd be very surprised if this is a real bug.

                    No i haven't tried using JBossCache in isolation, that's a good idea.

                    But i would rewrite your test as something like:
                    cache.put(k, v1);
                    T1: cache.put(k, v2); sleep(duration); //launched in another thread
                    before=currentTimeMillis
                    T2: assert cache.get(k) == v2 //v2, not v1
                    after=currentTimeMillis
                    assert after-before>=duration

                    The write thread must block the read thread, so T2 gets the value v2 not v1. I also verify (for consistency) that T2 has been blocked for at least the duration of T1.


                    Also, what version of jbosscache-core do you use? I'd recommend you upgrade to the latest (3.2.0.GA), this is compatible with POJO Cache 3.0.0.

                    Yes i already have upgraded to 3.2 for the core, but it didn't change anything.

                    Well i can always try to isolate the problem with a JBC core test as you proposed.

                    • 7. same test with Core instead of POJO: same result
                      ezza ezaezazea Novice

                      Manik, i have done the test you proposed (with the modification i have proposed in the previous post): it does exactly the same thing.
                      Sorry, i have to post in 2 parts because of my poor web access.

                      Here is my test class:

                      package hellotrackworld.test;
                      
                      import static org.junit.Assert.assertEquals;
                      import static org.junit.Assert.assertFalse;
                      import static org.junit.Assert.assertTrue;
                      import hellotrackworld.TracksTable;
                      import hellotrackworld.impl.srv.AtomikosTransactionManagerLookup;
                      
                      import java.util.concurrent.ExecutorService;
                      import java.util.concurrent.Executors;
                      
                      import javax.annotation.Resource;
                      import javax.transaction.TransactionManager;
                      
                      import org.jboss.cache.Cache;
                      import org.jboss.cache.CacheFactory;
                      import org.jboss.cache.DefaultCacheFactory;
                      import org.jboss.cache.config.Configuration.NodeLockingScheme;
                      import org.junit.After;
                      import org.junit.Before;
                      import org.junit.Test;
                      import org.junit.runner.RunWith;
                      import org.springframework.beans.BeansException;
                      import org.springframework.context.ApplicationContext;
                      import org.springframework.context.ApplicationContextAware;
                      import org.springframework.test.context.ContextConfiguration;
                      import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
                      import org.springframework.util.StopWatch;
                      
                      @RunWith(SpringJUnit4ClassRunner.class)
                      @ContextConfiguration ( locations={"/application-context_Jdbc_Atomikos.xml"} )
                      public class ComportementTransactionnelTableCache implements ApplicationContextAware
                      {
                       private Cache cache;
                       @Resource ( name="tracksTable" )
                       protected TracksTable table;
                       protected final ExecutorService executorService = Executors.newCachedThreadPool();
                       protected static ApplicationContext applicationContext;
                      
                       @Test
                       //@Ignore
                       public void isolationRWCore () throws Exception
                       {
                       table.coreSetK ( "V1" );
                       assertEquals ( "V1" , table.coreGetK() );
                      
                       int duration = 5; //Must be < lock acquire timeout
                       executorService.submit ( new LongCoreSetK ( "V2" , duration ) );
                       StopWatch stopWatch = new StopWatch ();
                       stopWatch.start();
                      
                       //Since LongCoreSetK is launched asynchronously, we must make sure
                       //that coreGetK be called after LongCoreSetK has hit the transactional resource
                       //(here JBoss core cache)
                       waitUntilWriterThreadHasHitTheTransactionalResource ();
                      
                       assertEquals ( "V2" , table.coreGetK() );
                       stopWatch.stop();
                      
                       double timeBlocked = stopWatch.getTotalTimeSeconds();
                       System.out.println ( "isolation/timeBlocked: " + timeBlocked );
                       assertTrue ( timeBlocked>= duration );
                       }
                      
                       private class LongCoreSetK implements Runnable
                       {
                       private final int dureeEnSecondes;
                       private final String v;
                      
                       LongCoreSetK ( String v, int durationInSeconds )
                       {
                       this.v = v;
                       this.dureeEnSecondes = durationInSeconds;
                       }
                      
                       @Override
                       public void run()
                       {
                       table.longCoreSetK ( v , dureeEnSecondes );
                       }
                       }
                      
                       @Before
                       public void before ()
                       {
                       cache = initPojoCache ();
                       wireCacheIntoService ( cache );
                      
                       table.setTheTransactionalResourceWasHit(false);//reset the flag
                       assertFalse ( table.getTheTransactionalResourceWasHit() );
                       }
                      
                       @After
                       public void stop ()
                       {
                       table.unsetCache ( cache );
                       cache.stop();
                       }
                      
                       private void wireCacheIntoService(Cache cache)
                       {
                       table.setCache ( cache );
                       }
                      
                       private Cache initPojoCache()
                       {
                       CacheFactory factory = new DefaultCacheFactory();
                       Cache cache = factory.createCache("resources/META-INF/replSync-service.xml", false);
                      
                       cache.getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC); //Test passes
                      
                       /*cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true); //Test fails at assertEquals ( "V2" , table.coreGetK() ); (was V1 instead)
                       cache.getConfiguration().setWriteSkewCheck(true);*/
                      
                       /*cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true); //Adding these options doesn't change anything: test fails at the same line
                       cache.getInvocationContext().getOptionOverrides().setFailSilently(false);
                       cache.getConfiguration().setUseLazyDeserialization(false);
                       cache.getConfiguration().setLockParentForChildInsertRemove ( true );
                       cache.getConfiguration().setSyncCommitPhase(true);
                       cache.getConfiguration().setSyncRollbackPhase(true);*/
                      
                       cache.create();
                       cache.start();
                       return cache;
                       }
                      
                       @Override
                       public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
                       {
                       TransactionManager tm = (TransactionManager) applicationContext.getBean ( "atomikosTransactionManager" );
                       AtomikosTransactionManagerLookup.setAtomikosTransactionManager ( tm );
                       }
                      
                       protected void waitUntilWriterThreadHasHitTheTransactionalResource() throws InterruptedException
                       {
                       while ( ! table.getTheTransactionalResourceWasHit() )
                       {
                       Thread.sleep ( 10 );
                       }
                       }
                      }


                      And the service implementation:
                      package hellotrackworld.impl;
                      
                      import hellotrackworld.TracksTable;
                      import hellotrackworld.util.ObjectUtils;
                      
                      import org.jboss.cache.Cache;
                      import org.jboss.cache.Node;
                      import org.springframework.stereotype.Service;
                      import org.springframework.transaction.annotation.Isolation;
                      import org.springframework.transaction.annotation.Propagation;
                      import org.springframework.transaction.annotation.Transactional;
                      
                      
                      @Service("tracksTable")
                      @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000)
                      public class TracksTableImpl implements TracksTable
                      {
                       private Boolean onATapeDansLaRessourceTransactionnelle = Boolean.FALSE;
                       private Cache coreCache;
                      
                       @Override
                       public String coreGetK()
                       {
                       Node rootNode = coreCache.getRoot();
                       return (String) rootNode.get ( "k" );
                       }
                      
                       @Override
                       public void coreSetK(String v)
                       {
                       Node rootNode = coreCache.getRoot();
                       rootNode.put ( "k" , v );
                       }
                      
                       @Override
                       public void longCoreSetK(String v, int dureeEnSecondes)
                       {
                       coreSetK ( v );
                       synchronized ( onATapeDansLaRessourceTransactionnelle )
                       {
                       onATapeDansLaRessourceTransactionnelle = Boolean.TRUE;
                       }
                       ObjectUtils.sleep ( dureeEnSecondes * 1000 );
                       }
                      
                       @Override
                       public void setCache ( Cache cache )
                       {
                       coreCache = cache;
                       }
                      
                       @Override
                       public void unsetCache ( Cache cache )
                       {
                       coreCache = null;
                       }
                      
                       @Override
                       public boolean getTheTransactionalResourceWasHit()
                       {
                       synchronized ( onATapeDansLaRessourceTransactionnelle )
                       {
                       return onATapeDansLaRessourceTransactionnelle;
                       }
                       }
                      
                       @Override
                       public void setTheTransactionalResourceWasHit(Boolean onATapeDansLaRessourceTransactionnelle)
                       {
                       synchronized ( onATapeDansLaRessourceTransactionnelle )
                       {
                       this.onATapeDansLaRessourceTransactionnelle = onATapeDansLaRessourceTransactionnelle;
                       }
                       }
                      }




                      • 8. Re: Transactions: atomicity OK, isolation KO
                        ezza ezaezazea Novice

                        Finally my replSync-service.cml implementation:

                        <?xml version="1.0" encoding="UTF-8"?>
                        
                        
                        <server>
                         <mbean code="org.jboss.cache.jmx.CacheJmxWrapper"
                         name="jboss.cache:service=TreeCache">
                        
                         <attribute name="TransactionManagerLookupClass">hellotrackworld.impl.srv.AtomikosTransactionManagerLookup</attribute>
                         <attribute name="IsolationLevel">SERIALIZABLE</attribute>
                         <attribute name="CacheMode">LOCAL</attribute>
                         <attribute name="UseReplQueue">false</attribute>
                         <attribute name="ReplQueueInterval">0</attribute>
                         <attribute name="ReplQueueMaxElements">0</attribute>
                         <attribute name="ClusterName">JBossCache-Cluster</attribute>
                        
                         <attribute name="ClusterConfig">
                         <config>
                         <UDP mcast_addr="228.10.10.10"
                         mcast_port="45588"
                         tos="8"
                         ucast_recv_buf_size="20000000"
                         ucast_send_buf_size="640000"
                         mcast_recv_buf_size="25000000"
                         mcast_send_buf_size="640000"
                         loopback="false"
                         discard_incompatible_packets="true"
                         max_bundle_size="64000"
                         max_bundle_timeout="30"
                         use_incoming_packet_handler="true"
                         ip_ttl="2"
                         enable_bundling="false"
                         enable_diagnostics="true"
                        
                         use_concurrent_stack="true"
                        
                         thread_naming_pattern="pl"
                        
                         thread_pool.enabled="true"
                         thread_pool.min_threads="1"
                         thread_pool.max_threads="25"
                         thread_pool.keep_alive_time="30000"
                         thread_pool.queue_enabled="true"
                         thread_pool.queue_max_size="10"
                         thread_pool.rejection_policy="Run"
                        
                         oob_thread_pool.enabled="true"
                         oob_thread_pool.min_threads="1"
                         oob_thread_pool.max_threads="4"
                         oob_thread_pool.keep_alive_time="10000"
                         oob_thread_pool.queue_enabled="true"
                         oob_thread_pool.queue_max_size="10"
                         oob_thread_pool.rejection_policy="Run"/>
                        
                         <PING timeout="2000" num_initial_members="3"/>
                         <MERGE2 max_interval="30000" min_interval="10000"/>
                         <FD_SOCK/>
                         <FD timeout="10000" max_tries="5" shun="true"/>
                         <VERIFY_SUSPECT timeout="1500"/>
                         <pbcast.NAKACK max_xmit_size="60000"
                         use_mcast_xmit="false" gc_lag="0"
                         retransmit_timeout="300,600,1200,2400,4800"
                         discard_delivered_msgs="true"/>
                         <UNICAST timeout="300,600,1200,2400,3600"/>
                         <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                         max_bytes="400000"/>
                         <pbcast.GMS print_local_addr="true" join_timeout="5000"
                         join_retry_timeout="2000" shun="false"
                         view_bundling="true" view_ack_collection_timeout="5000"/>
                         <FRAG2 frag_size="60000"/>
                         <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/>
                         <!-- <pbcast.STATE_TRANSFER/> -->
                         <pbcast.FLUSH timeout="0"/>
                         </config>
                         </attribute>
                        
                        
                         <attribute name="FetchInMemoryState">true</attribute>
                         <attribute name="StateRetrievalTimeout">15000</attribute>
                         <attribute name="SyncReplTimeout">15000</attribute>
                         <attribute name="LockAcquisitionTimeout">10000</attribute>
                         <attribute name="UseRegionBasedMarshalling">true</attribute>
                         </mbean>
                        </server>
                        


                        Result of the test:
                        -cache.getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC) : Test passes
                        -cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true) : Test fails at assertEquals ( "V2" , table.coreGetK() ); (was V1 instead)
                        -adding the other options: Doesn't change anything, test fails at the same line




                        • 9. Re: Transactions: atomicity OK, isolation KO
                          Manik Surtani Master

                          Hmm, a lot of this is very Spring-heavy. Any chance of a unit test along the lines of what we have in the JBC testsuite? Perhaps similar to, or an extension of, http://bit.ly/2km9bB ?

                          • 10. Re: Transactions: atomicity OK, isolation KO
                            ezza ezaezazea Novice

                            Come on it's just a small Spring file of nothing at all :)
                            Alright, i'm going to do the same test with no Spring.
                            I guess I just need to use JBC TransactionManager instead of my JTA tm

                            TransactionManager mgr = cache.getTransactionManager();



                            Meanwhile, can you confirm if you agree with the little correction i added earlier?
                            "chtimi2" wrote:

                            But i would rewrite your test as something like:
                            cache.put(k, v1);
                            T1: cache.put(k, v2); sleep(duration); //launched in another thread
                            before=currentTimeMillis
                            T2: assert cache.get(k) == v2 //v2, not v1
                            after=currentTimeMillis
                            assert after-before>=duration

                            The write thread must block the read thread, so T2 gets the value v2 not v1. I also verify (for consistency) that T2 has been blocked for at least the duration of T1.


                            • 11. Re: Transactions: atomicity OK, isolation KO
                              Manik Surtani Master

                               

                              "chtimi2" wrote:
                              Come on it's just a small Spring file of nothing at all :)


                              :-) If I end up adding this to our test suite, I don't want to pollute it with unnecessary stuff.

                              "chtimi2" wrote:

                              Meanwhile, can you confirm if you agree with the little correction i added earlier?
                              "chtimi2" wrote:

                              But i would rewrite your test as something like:
                              cache.put(k, v1);
                              T1: cache.put(k, v2); sleep(duration); //launched in another thread
                              before=currentTimeMillis
                              T2: assert cache.get(k) == v2 //v2, not v1
                              after=currentTimeMillis
                              assert after-before>=duration

                              The write thread must block the read thread, so T2 gets the value v2 not v1. I also verify (for consistency) that T2 has been blocked for at least the duration of T1.


                              No, with MVCC you have non-blocking reads. The reader will see the old value until the writer commits, after which the reader will see the new value.

                              • 12. Re: Transactions: atomicity OK, isolation KO
                                ezza ezaezazea Novice

                                 

                                "manik.surtani@jboss.com" wrote:

                                No, with MVCC you have non-blocking reads. The reader will see the old value until the writer commits, after which the reader will see the new value.

                                We're getting to the heart of the issue. The test becomes meaningless for now since this behaviour is not the one i want to assert.
                                Instead I have two questions, one technical and one of principle.

                                First the technical question: if there are no blocking reads in MVCC, then what is this option for?
                                cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);

                                I thought it was supposed to make reads blocking, because in the other thread you mention it as a solution to implement a "select for update".
                                But if i remember right, calling a "select for update" has two effects:
                                1/Once the read has acquired the lock, it blocks writes for its duration
                                2/The read lock can't be acquired while there is a write operation running on the same resource. Default behaviour is blocking until the lock is released ("select for update NOWAIT" fails immediately instead).
                                Are you saying that MVCC can do 1/ but not 2/? Or neither?


                                Now the more general question:
                                If there is really no way to implement blocking reads with MVCC, then you shouldn't deprecate pessimistic locking (PL).
                                MVCC can certainly handle a lot bigger throughput than PL, but we don't need this additional capability: we're not a many-users web application, we use transactions in a component framework where we need blocking reads.
                                Throughput is good, but isolation semantics compatible with our application framework requirements is better. Would it be possible to reconsider this deprecation? (and lack of support in Infinispan)


                                Last question, do you have recommended readings on MVCC in the context of this discussion (i'm still interested in understanding MVCC even though we need pessimistic locking)?

                                • 13. Re: Transactions: atomicity OK, isolation KO
                                  Manik Surtani Master

                                   

                                  "chtimi2" wrote:

                                  First the technical question: if there are no blocking reads in MVCC, then what is this option for?
                                  cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);



                                  It is to prevent another writer from changing the value. Useful for counters, for example, where you need to perform:
                                  {
                                   read value
                                   increment value
                                   write value
                                  }
                                  

                                  atomically.

                                  "chtimi2" wrote:

                                  I thought it was supposed to make reads blocking, because in the other thread you mention it as a solution to implement a "select for update".
                                  But if i remember right, calling a "select for update" has two effects:
                                  1/Once the read has acquired the lock, it blocks writes for its duration
                                  2/The read lock can't be acquired while there is a write operation running on the same resource. Default behaviour is blocking until the lock is released ("select for update NOWAIT" fails immediately instead).
                                  Are you saying that MVCC can do 1/ but not 2/? Or neither?


                                  An MVCC read operation with the force write lock option will do 1 and 2.


                                  "chtimi2" wrote:

                                  Would it be possible to reconsider this deprecation? (and lack of support in Infinispan)


                                  No, since the pessimistic locking scheme is prone to deadlocks.

                                  "chtimi2" wrote:

                                  Last question, do you have recommended readings on MVCC in the context of this discussion (i'm still interested in understanding MVCC even though we need pessimistic locking)?


                                  The user guide has a detailed section on this, as does the design wiki.

                                  http://www.jboss.org/community/wiki/JBossCacheMVCC

                                  • 14. Re: Transactions: atomicity OK, isolation KO
                                    ezza ezaezazea Novice

                                    Hello i'm back. I have been reading again the UserGuide about transactions, and a few more articles about MVCC (in general and in JBC).
                                    I think i'm starting to get it.

                                    Now from what i gather it seems my real issue is not MVCC, but that JBC doesn't implement the Serializable isolation level.
                                    JBC's MVCC implements Snapshot isolation (SI), but not Serializable snapshot isolation (SSI).

                                    To relate this to our previous discussion

                                    "manik.surtani@jboss.com" wrote:

                                    cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);

                                    is to prevent another writer from changing the value. Useful for counters, for example, where you need to perform:
                                    {
                                    read value
                                    increment value
                                    write value
                                    }
                                    atomically.

                                    I have written a unit test that does this, but is a bit more demanding.
                                    It doesn't pass, but the problem is twofold:
                                    A/ Technical problem: With RepeatableRead(RR) i can't get the writeSkewCheck to work properly (test hangs indefinitely).
                                    B/ Fundamental problem: Neither RR nor RC could ever make it pass. The best i could hope for is "correct failure" (if there wasn't problem 1/).
                                    That's because it assumes Serializable isolation.
                                    BUT unlike my previous test it doesn't assume pessimistic locking. It would pass if JB's MVCC implemented Serializable Snapshot Isolation (SIS)


                                    Now for the test. Regarding issue 1/ I know it uses Spring, but before i try to write a non-Spring one i want to know if you see an obvious problem or wrong assumption (like was the case last time) with it.
                                    The test is the classic non-atomic iteration. Shared state is a counter, incremented by each thread (common use case).
                                    It verifies that the final value of the counter equals the number of writers (problem B).

                                    @RunWith(SpringJUnit4ClassRunner.class)
                                    @ContextConfiguration ( locations={"/application-context_Jdbc_Atomikos.xml"} )
                                    public class ComportementTransactionnelTableCoreCache implements ApplicationContextAware
                                    {
                                     private Cache cache;
                                     @Resource ( name="tracksTable" )
                                     protected TracksTable table;
                                     protected final ExecutorService executorService = Executors.newCachedThreadPool();
                                     protected static ApplicationContext applicationContext;
                                    
                                     @Test
                                     public void isolation_RW_RW () throws Exception
                                     {
                                     table.initScalar ();
                                    
                                     int nbWriters = 10;
                                    
                                     CountDownLatch latch = new CountDownLatch ( nbWriters );
                                     for ( int i = 0 ; i < nbWriters ; i ++ )
                                     {
                                     executorService.submit ( new AsyncIncrement ( latch ) );
                                     }
                                     latch.await ();
                                    
                                     assertEquals ( nbWriters , table.getScalar() );
                                     }
                                    
                                     protected class AsyncIncrement implements Runnable
                                     {
                                     private final CountDownLatch latch;
                                    
                                     public AsyncIncrement ( CountDownLatch latch )
                                     {
                                     this.latch = latch;
                                     }
                                    
                                     @Override
                                     public void run()
                                     {
                                     table.incrementScalar ();
                                     latch.countDown ();
                                     }
                                     }
                                    
                                     @Before
                                     public void before ()
                                     {
                                     cache = initPojoCache ();
                                     cache.start();
                                     table.setCache ( cache );
                                     }
                                    
                                     @After
                                     public void stop ()
                                     {
                                     table.unsetCache ( cache );
                                     cache.stop();
                                     }
                                    
                                     private Cache initPojoCache()
                                     {
                                     CacheFactory factory = new DefaultCacheFactory();
                                     Cache cache = factory.createCache("resources/META-INF/replSync-service.xml", false);
                                    
                                     //cache.getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC);
                                     cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
                                     cache.getConfiguration().setWriteSkewCheck(true);
                                    
                                     cache.create();
                                     return cache;
                                     }
                                    
                                     @Override
                                     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
                                     {
                                     TransactionManager tm = (TransactionManager) applicationContext.getBean ( "atomikosTransactionManager" );
                                     AtomikosTransactionManagerLookup.setAtomikosTransactionManager ( tm );
                                     }
                                    }
                                    


                                    The service:
                                    @Service("tracksTable")
                                    @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000)
                                    public class TracksTableImpl implements TracksTable
                                    {
                                     private Cache coreCache;
                                     private int scalar;
                                    
                                     @Override
                                     public int getScalar()
                                     {
                                     Node rootNode = coreCache.getRoot();
                                     return (Integer)rootNode.get ( "scalar" );
                                     }
                                    
                                     @Override
                                     public void incrementScalar()
                                     {
                                     int tmp = getScalar();
                                     tmp++;
                                     setScalar ( tmp );
                                     }
                                    
                                     @Override
                                     public void initScalar()
                                     {
                                     setScalar ( 0 );
                                     }
                                    
                                     private void setScalar ( int scalar )
                                     {
                                     Node rootNode = coreCache.getRoot();
                                     rootNode.put ( "scalar" , scalar );
                                     }
                                    
                                     @Override
                                     public void setCache ( Cache cache )
                                     {
                                     coreCache = cache;
                                     }
                                    
                                     @Override
                                     public void unsetCache ( Cache cache )
                                     {
                                     coreCache = null;
                                     }
                                    }
                                    


                                    My replSync-service.xml:
                                    <?xml version="1.0" encoding="UTF-8"?>
                                    <server>
                                     <mbean code="org.jboss.cache.jmx.CacheJmxWrapper" name="jboss.cache:service=TreeCache">
                                    
                                     <attribute name="TransactionManagerLookupClass">hellotrackworld.impl.srv.AtomikosTransactionManagerLookup</attribute>
                                     <attribute name="IsolationLevel">REPEATABLE_READ</attribute>
                                     <attribute name="CacheMode">LOCAL</attribute>
                                     <attribute name="UseReplQueue">false</attribute>
                                     <attribute name="ReplQueueInterval">0</attribute>
                                     <attribute name="ReplQueueMaxElements">0</attribute>
                                     <attribute name="ClusterName">JBossCache-Cluster</attribute>
                                    
                                     <attribute name="ClusterConfig">
                                     <config>
                                     <UDP mcast_addr="228.10.10.10"
                                     mcast_port="45588"
                                     tos="8"
                                     ucast_recv_buf_size="20000000"
                                     ucast_send_buf_size="640000"
                                     mcast_recv_buf_size="25000000"
                                     mcast_send_buf_size="640000"
                                     loopback="false"
                                     discard_incompatible_packets="true"
                                     max_bundle_size="64000"
                                     max_bundle_timeout="30"
                                     use_incoming_packet_handler="true"
                                     ip_ttl="2"
                                     enable_bundling="false"
                                     enable_diagnostics="true"
                                    
                                     use_concurrent_stack="true"
                                    
                                     thread_naming_pattern="pl"
                                    
                                     thread_pool.enabled="true"
                                     thread_pool.min_threads="1"
                                     thread_pool.max_threads="25"
                                     thread_pool.keep_alive_time="30000"
                                     thread_pool.queue_enabled="true"
                                     thread_pool.queue_max_size="10"
                                     thread_pool.rejection_policy="Run"
                                    
                                     oob_thread_pool.enabled="true"
                                     oob_thread_pool.min_threads="1"
                                     oob_thread_pool.max_threads="4"
                                     oob_thread_pool.keep_alive_time="10000"
                                     oob_thread_pool.queue_enabled="true"
                                     oob_thread_pool.queue_max_size="10"
                                     oob_thread_pool.rejection_policy="Run"/>
                                    
                                     <PING timeout="2000" num_initial_members="3"/>
                                     <MERGE2 max_interval="30000" min_interval="10000"/>
                                     <FD_SOCK/>
                                     <FD timeout="10000" max_tries="5" shun="true"/>
                                     <VERIFY_SUSPECT timeout="1500"/>
                                     <pbcast.NAKACK max_xmit_size="60000"
                                     use_mcast_xmit="false" gc_lag="0"
                                     retransmit_timeout="300,600,1200,2400,4800"
                                     discard_delivered_msgs="true"/>
                                     <UNICAST timeout="300,600,1200,2400,3600"/>
                                     <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                                     max_bytes="400000"/>
                                     <pbcast.GMS print_local_addr="true" join_timeout="5000"
                                     join_retry_timeout="2000" shun="false"
                                     view_bundling="true" view_ack_collection_timeout="5000"/>
                                     <FRAG2 frag_size="60000"/>
                                     <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/>
                                     <!-- <pbcast.STATE_TRANSFER/> -->
                                     <pbcast.FLUSH timeout="0"/>
                                     </config>
                                     </attribute>
                                    
                                    
                                     <attribute name="FetchInMemoryState">true</attribute>
                                     <attribute name="StateRetrievalTimeout">15000</attribute>
                                     <attribute name="SyncReplTimeout">15000</attribute>
                                     <attribute name="LockAcquisitionTimeout">10000</attribute>
                                     <attribute name="UseRegionBasedMarshalling">true</attribute>
                                     </mbean>
                                    </server>
                                    



                                    The result, depending on the isolation level:

                                    0/ Using Serializable(with pessimistic locking), the test passes

                                    1/ With MVCC RR: test hangs forever, i get these logs which sound good (the skew has been detected but why does the test hang?):
                                    [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-7] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.


                                    2/ With MVCC RC: test fails, not surprisingly since (depending on timing) there is no guarantee one given thread will see the update of another


                                    Interestingly the wiki you linked to mentions implementing Serializable, but it seems to be an old preliminary study.
                                    Anyway I think this use case is so common it justifies implementing Serializable snapshot isolation in Infinispan don't you agree? (I'm currently studying replacing Java locks in a component framework by transactions, so i don't see how i can do without a Serializable isolation level.)

                                    1 2 3 4 Previous Next