1 2 3 4 Previous Next 49 Replies Latest reply on Jan 21, 2010 10:04 AM by galder.zamarreno Go to original post
      • 15. Re: Transactions: atomicity OK, isolation KO
        manik

        I'm going to focus on RR and the test hanging, since this is incorrect behaviour. Do you have a thread dump of this when it hangs?

        • 16. Re: Transactions: atomicity OK, isolation KO

          jconsole's deadlock detection (not that it is 100% fiable...)

          findDeadlockedThreads: null
          findMonitorDeadlockedThreads: null
          


          jstack output (the thread dump):
          D:\>jstack 4088
          2009-09-29 18:24:16
          Full thread dump Java HotSpot(TM) Client VM (14.0-b16 mixed mode):
          
          "RMI TCP Connection(5)-10.46.35.83" daemon prio=6 tid=0x0afb8400 nid=0xfa4 runna
          ble [0x0c62f000]
           java.lang.Thread.State: RUNNABLE
           at java.net.SocketInputStream.socketRead0(Native Method)
           at java.net.SocketInputStream.read(SocketInputStream.java:129)
           at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
           at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
           - locked <0x031ae1c0> (a java.io.BufferedInputStream)
           at java.io.FilterInputStream.read(FilterInputStream.java:66)
           at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:5
          17)
           at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTranspor
          t.java:790)
           at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport
          .java:649)
           at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
          utor.java:886)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
          .java:908)
           at java.lang.Thread.run(Thread.java:619)
          
          "RMI TCP Connection(4)-10.46.35.83" daemon prio=6 tid=0x0b349400 nid=0x868 runna
          ble [0x0c5df000]
           java.lang.Thread.State: RUNNABLE
           at java.net.SocketInputStream.socketRead0(Native Method)
           at java.net.SocketInputStream.read(SocketInputStream.java:129)
           at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
           at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
           - locked <0x031ae3e0> (a java.io.BufferedInputStream)
           at java.io.FilterInputStream.read(FilterInputStream.java:66)
           at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:5
          17)
           at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTranspor
          t.java:790)
           at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport
          .java:649)
           at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
          utor.java:886)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
          .java:908)
           at java.lang.Thread.run(Thread.java:619)
          
          "JMX server connection timeout 33" daemon prio=6 tid=0x0b4b8400 nid=0xab0 in Obj
          ect.wait() [0x0c4df000]
           java.lang.Thread.State: TIMED_WAITING (on object monitor)
           at java.lang.Object.wait(Native Method)
           - waiting on <0x0317c3d0> (a [I)
           at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(Serve
          rCommunicatorAdmin.java:150)
           - locked <0x0317c3d0> (a [I)
           at java.lang.Thread.run(Thread.java:619)
          
          "RMI Scheduler(0)" daemon prio=6 tid=0x0b271400 nid=0xe68 waiting on condition [
          0x0c48f000]
           java.lang.Thread.State: TIMED_WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for <0x03160d10> (a java.util.concurrent.locks.Abstra
          ctQueuedSynchronizer$ConditionObject)
           at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198
          )
           at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject
          .awaitNanos(AbstractQueuedSynchronizer.java:1963)
           at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
           at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.tak
          e(ScheduledThreadPoolExecutor.java:583)
           at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.tak
          e(ScheduledThreadPoolExecutor.java:576)
           at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.ja
          va:947)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
          .java:907)
           at java.lang.Thread.run(Thread.java:619)
          
          "RMI TCP Connection(1)-10.46.35.83" daemon prio=6 tid=0x0ab03400 nid=0xa28 in Ob
          ject.wait() [0x0c43e000]
           java.lang.Thread.State: TIMED_WAITING (on object monitor)
           at java.lang.Object.wait(Native Method)
           - waiting on <0x031b2aa8> (a com.sun.jmx.remote.internal.ArrayNotificati
          onBuffer)
           at com.sun.jmx.remote.internal.ArrayNotificationBuffer.fetchNotification
          s(ArrayNotificationBuffer.java:417)
           - locked <0x031b2aa8> (a com.sun.jmx.remote.internal.ArrayNotificationBu
          ffer)
           at com.sun.jmx.remote.internal.ArrayNotificationBuffer$ShareBuffer.fetch
          Notifications(ArrayNotificationBuffer.java:209)
           at com.sun.jmx.remote.internal.ServerNotifForwarder.fetchNotifs(ServerNo
          tifForwarder.java:258)
           at javax.management.remote.rmi.RMIConnectionImpl$2.run(RMIConnectionImpl
          .java:1227)
           at javax.management.remote.rmi.RMIConnectionImpl$2.run(RMIConnectionImpl
          .java:1225)
           at javax.management.remote.rmi.RMIConnectionImpl.fetchNotifications(RMIC
          onnectionImpl.java:1231)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
          java:39)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
          sorImpl.java:25)
           at java.lang.reflect.Method.invoke(Method.java:597)
           at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:305)
           at sun.rmi.transport.Transport$1.run(Transport.java:159)
           at java.security.AccessController.doPrivileged(Native Method)
           at sun.rmi.transport.Transport.serviceCall(Transport.java:155)
           at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:5
          35)
           at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTranspor
          t.java:790)
           at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport
          .java:649)
           at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
          utor.java:886)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
          .java:908)
           at java.lang.Thread.run(Thread.java:619)
          
          "RMI TCP Accept-0" daemon prio=6 tid=0x0aaae400 nid=0xad0 runnable [0x0c3df000]
           java.lang.Thread.State: RUNNABLE
           at java.net.PlainSocketImpl.socketAccept(Native Method)
           at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
           - locked <0x03160e68> (a java.net.SocksSocketImpl)
           at java.net.ServerSocket.implAccept(ServerSocket.java:453)
           at java.net.ServerSocket.accept(ServerSocket.java:421)
           at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRM
          IServerSocketFactory.java:34)
           at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTr
          ansport.java:369)
           at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:3
          41)
           at java.lang.Thread.run(Thread.java:619)
          
          "ReaderThread" prio=6 tid=0x0b0cd800 nid=0xdd4 runnable [0x0b46f000]
           java.lang.Thread.State: RUNNABLE
           at java.net.SocketInputStream.socketRead0(Native Method)
           at java.net.SocketInputStream.read(SocketInputStream.java:129)
           at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
           at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
           at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
           - locked <0x02eae848> (a java.io.InputStreamReader)
           at java.io.InputStreamReader.read(InputStreamReader.java:167)
           at java.io.BufferedReader.fill(BufferedReader.java:136)
           at java.io.BufferedReader.readLine(BufferedReader.java:299)
           - locked <0x02eae848> (a java.io.InputStreamReader)
           at java.io.BufferedReader.readLine(BufferedReader.java:362)
           at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner$ReaderThread.r
          un(RemoteTestRunner.java:140)
          
          "Low Memory Detector" daemon prio=6 tid=0x0aaa4400 nid=0xefc runnable [0x0000000
          0]
           java.lang.Thread.State: RUNNABLE
          
          "CompilerThread0" daemon prio=10 tid=0x0aa9f800 nid=0xa7c waiting on condition [
          0x00000000]
           java.lang.Thread.State: RUNNABLE
          
          "Attach Listener" daemon prio=10 tid=0x0aa9d400 nid=0xe98 waiting on condition [
          0x00000000]
           java.lang.Thread.State: RUNNABLE
          
          "Signal Dispatcher" daemon prio=10 tid=0x0aa9c000 nid=0xe44 runnable [0x00000000
          ]
           java.lang.Thread.State: RUNNABLE
          
          "Finalizer" daemon prio=8 tid=0x0aa8c400 nid=0xb0c in Object.wait() [0x0abff000]
          
           java.lang.Thread.State: WAITING (on object monitor)
           at java.lang.Object.wait(Native Method)
           - waiting on <0x02e76ec8> (a java.lang.ref.ReferenceQueue$Lock)
           at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
           - locked <0x02e76ec8> (a java.lang.ref.ReferenceQueue$Lock)
           at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
           at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
          
          "Reference Handler" daemon prio=10 tid=0x0aa87800 nid=0x838 in Object.wait() [0x
          0abaf000]
           java.lang.Thread.State: WAITING (on object monitor)
           at java.lang.Object.wait(Native Method)
           - waiting on <0x02e76f50> (a java.lang.ref.Reference$Lock)
           at java.lang.Object.wait(Object.java:485)
           at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
           - locked <0x02e76f50> (a java.lang.ref.Reference$Lock)
          
          "main" prio=6 tid=0x0039a000 nid=0xbd4 waiting on condition [0x003ff000]
           java.lang.Thread.State: WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for <0x03147ce8> (a java.util.concurrent.CountDownLat
          ch$Sync)
           at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
           at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInt
          errupt(AbstractQueuedSynchronizer.java:747)
           at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared
          Interruptibly(AbstractQueuedSynchronizer.java:905)
           at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedIn
          terruptibly(AbstractQueuedSynchronizer.java:1217)
           at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
          1 at hellotrackworld.test.ComportementTransactionnelTableCoreCache.isolati
          on_RW_RW(ComportementTransactionnelTableCoreCache.java:50)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
          java:39)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
          sorImpl.java:25)
           at java.lang.reflect.Method.invoke(Method.java:597)
           at org.springframework.test.context.junit4.SpringTestMethod.invoke(Sprin
          gTestMethod.java:163)
           at org.springframework.test.context.junit4.SpringMethodRoadie.runTestMet
          hod(SpringMethodRoadie.java:233)
           at org.springframework.test.context.junit4.SpringMethodRoadie$RunBefores
          ThenTestThenAfters.run(SpringMethodRoadie.java:333)
           at org.springframework.test.context.junit4.SpringMethodRoadie.runWithRep
          etitions(SpringMethodRoadie.java:217)
           at org.springframework.test.context.junit4.SpringMethodRoadie.runTest(Sp
          ringMethodRoadie.java:197)
           at org.springframework.test.context.junit4.SpringMethodRoadie.run(Spring
          MethodRoadie.java:143)
           at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.invok
          eTestMethod(SpringJUnit4ClassRunner.java:142)
           at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRu
          nner.java:51)
           at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.
          java:44)
           at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.jav
          a:27)
           at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:
          37)
           at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.ja
          va:42)
           at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4
          TestReference.java:45)
           at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution
          .java:38)
           at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(Remot
          eTestRunner.java:460)
           at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(Remot
          eTestRunner.java:673)
           at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTest
          Runner.java:386)
           at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTes
          tRunner.java:196)
          
          "VM Thread" prio=10 tid=0x0aa83800 nid=0xcf0 runnable
          
          "VM Periodic Task Thread" prio=10 tid=0x0aab0400 nid=0xfc0 waiting on condition
          
          
          JNI global references: 811
          


          Log output:
          [org.springframework.test.context.TestContextManager][main] - @TestExecutionListeners is not present for class [class hellotrackworld.test.ComportementTransactionnelTableCoreCache]: using defaults.
          [org.springframework.beans.factory.xml.XmlBeanDefinitionReader][main] - Loading XML bean definitions from class path resource [application-context_Jdbc_Atomikos.xml]
          [org.springframework.context.support.GenericApplicationContext][main] - Refreshing org.springframework.context.support.GenericApplicationContext@513cf0: display name [org.springframework.context.support.GenericApplicationContext@513cf0]; startup date [Tue Sep 29 18:08:45 CEST 2009]; root of context hierarchy
          [org.springframework.context.support.GenericApplicationContext][main] - Bean factory for application context [org.springframework.context.support.GenericApplicationContext@513cf0]: org.springframework.beans.factory.support.DefaultListableBeanFactory@1f1235b
          [org.springframework.beans.factory.support.DefaultListableBeanFactory][main] - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1f1235b: defining beans [tracksTable,org.springframework.context.annotation.internalPersistenceAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,dataSource,jdbcTemplate,atomikosTransactionManager,atomikosUserTransaction,jtaTransactionManager,org.springframework.aop.config.internalAutoProxyCreator,org.springframework.transaction.annotation.AnnotationTransactionAttributeSource#0,org.springframework.transaction.interceptor.TransactionInterceptor#0,org.springframework.transaction.config.internalTransactionAdvisor]; root of factory hierarchy
          Using init file: /D:/ff/jta.properties
          [org.springframework.transaction.jta.JtaTransactionManager][main] - Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@bf5555
          [org.springframework.transaction.jta.JtaTransactionManager][main] - Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@17b4703
          [org.jboss.cache.config.parsing.RootElementBuilder][main] - Configuration warning: cvc-elt.1: Cannot find the declaration of element 'server'.
          [org.jboss.cache.config.parsing.RootElementBuilder][main] - org.jboss.cache.config.ConfigurationException: Incorrect configuration file. Use '-Djbosscache.config.validate=false' to disable validation.
          !!!!!!!!!!!!SpringAtomikosTransactionManagerLookup!!!!!!!!!!!!!!
          [org.jboss.cache.jmx.PlatformMBeanServerRegistration][main] - JBossCache MBeans were successfully registered to the platform mbean server.
          [org.jboss.cache.factories.ComponentRegistry][main] - JBoss Cache version: JBossCache 'Naga' 3.0.0.GA
          [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-2] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.
          [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-7] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.
          [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-8] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.
          [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-6] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.
          [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-3] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.
          [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-5] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.
          [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-4] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.
          [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-10] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.
          


          • 17. Re: Transactions: atomicity OK, isolation KO

            Shame on me..

            The test hanged because i had written:

            protected class AsyncIncrement implements Runnable
            {
             private final CountDownLatch latch;
            
             public AsyncIncrement ( CountDownLatch latch )
             {
             this.latch = latch;
             }
            
             @Override
             public void run()
             {
             table.incrementScalar ();
             latch.countDown ();
             }
            }
            


            instead of:
            protected class AsyncIncrement implements Runnable
            {
             private final CountDownLatch latch;
            
             public AsyncIncrement ( CountDownLatch latch )
             {
             this.latch = latch;
             }
            
             @Override
             public void run()
             {
             try
             {
             table.incrementScalar ();
             }
             finally
             {
             latch.countDown ();
             }
             }
            }
            


            • 18. Re: Transactions: atomicity OK, isolation KO

              Now that this miserable "issue" is resolved, what do you think about my question in the previous post (regarding the SERIALIZABLE isolation level in MVCC and serializable snapshot isolation)?

              • 19. Re: Transactions: atomicity OK, isolation KO
                jason.greene

                Your test looks incorrect.

                If you want to serialize access to something in MVCC, you just need to start a transaction ensure there is a write lock on the node, perform your operations and then commit. The lock is held for the length of the transaction. No longer. This is similar to select for update, in that you can use a node as a cooperative lock, effectively serializing everything using the same node in that manner.

                There are two ways to do this:

                1. Force a write lock on a read:
                tx.begin();
                cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
                count = cache.get("/foo/mynode", "counter");
                cache.put("/foo/mynode", count + 1);
                tx.commit();

                This has to be done immediately before the read operation. It lasts for only one cache operation. Also there is a drawback that it does not work across a cluster, it is only useful for local cache usage.

                2. Write to a key first in the transaction, this forces a write lock to be aquired on the node such that all subsequent reads are current. This also works across a cluster.

                tx.begin();
                cache.put("/foo/mynode", "_lockthisplease_");
                count = cache.get("/foo/mynode", "counter");
                cache.put("/foo/mynode", count + 1);
                tx.commit();

                Hopefully that clears it up.

                • 20. Re: Transactions: atomicity OK, isolation KO
                  jason.greene

                   

                  "jason.greene@jboss.com" wrote:
                  It lasts for only one cache operation..


                  To clarify, I mean that any kind of invocation flag (the write-lock flag), only applies to the subsequent method call. So if you want to lock 2 different reads, you have to repeat it. The locks are still held for the transaction.

                  cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
                  cache.get("/blah1");
                  cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
                  cache.get("/blah2");


                  • 21. Re: Transactions: atomicity OK, isolation KO

                    I see, i didn't know that about the scope setForceWriteLock.
                    Actually doing a write is another idea that i hadn't tried.

                    But i tried doing either and both, and the test still fails.

                    Here are my changes to the service implementation:

                    @Service("tracksTable")
                    @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000)
                    public class TracksTableImpl implements TracksTable
                    {
                     private Cache cache;
                    
                     @Override
                     public int getScalar()
                     {
                     return (Integer)cache.get("/foo/mynode", "scalar");
                     }
                    
                     @Override
                     public void incrementScalar()
                     {
                     cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
                     cache.put("/foo/mynode", "_lockthisplease_", "_lockthisplease_");
                     int tmp = getScalar();
                     tmp++;
                     setScalar ( tmp );
                     }
                    
                     @Override
                     public void initScalar()
                     {
                     setScalar ( 0 );
                     }
                    
                     private void setScalar ( int scalar )
                     {
                     cache.put ( "/foo/mynode" , "scalar" , scalar );
                     }
                    
                     @Override
                     public void setCache ( Cache cache )
                     {
                     this.cache = cache;
                     }
                    
                     @Override
                     public void unsetCache ()
                     {
                     this.cache = null;
                     }
                    }
                    


                    • 22. Re: Transactions: atomicity OK, isolation KO

                      I have written the same test without using Spring.

                      The test:

                      package hellotrackworld.test;
                      
                      import static org.junit.Assert.assertEquals;
                      import hellotrackworld.TracksTable;
                      import hellotrackworld.impl.TracksTableImpl;
                      
                      import java.util.concurrent.CountDownLatch;
                      import java.util.concurrent.ExecutorService;
                      import java.util.concurrent.Executors;
                      
                      import javax.transaction.TransactionManager;
                      
                      import org.jboss.cache.Cache;
                      import org.jboss.cache.CacheFactory;
                      import org.jboss.cache.DefaultCacheFactory;
                      import org.junit.After;
                      import org.junit.Before;
                      import org.junit.Test;
                      import org.springframework.context.ApplicationContext;
                      
                      public class ComportementTransactionnelTableCoreCache
                      {
                       private Cache cache;
                       protected TracksTable table = new TracksTableImpl ();
                       protected final ExecutorService executorService = Executors.newCachedThreadPool();
                       protected static ApplicationContext applicationContext;
                      
                       @Test
                       public void isolation_RW_RW () throws Exception
                       {
                       table.initScalar ();
                      
                       int nbWriters = 10;
                      
                       CountDownLatch latch = new CountDownLatch ( nbWriters );
                       for ( int i = 0 ; i < nbWriters ; i ++ )
                       {
                       executorService.submit ( new AsyncIncrement ( latch ) );
                       }
                       latch.await ();
                      
                       assertEquals ( nbWriters , table.getScalar() );
                       }
                      
                       protected class AsyncIncrement implements Runnable
                       {
                       private final CountDownLatch latch;
                      
                       public AsyncIncrement ( CountDownLatch latch )
                       {
                       this.latch = latch;
                       }
                      
                       @Override
                       public void run()
                       {
                       try { table.incrementScalar (); }
                       finally { latch.countDown (); }
                       }
                       }
                      
                       @Before
                       public void before ()
                       {
                       cache = initPojoCache ();
                       cache.start();
                       table.setCache ( cache );
                      
                       table.initScalar();
                       assertEquals ( 0 , table.getScalar() );
                       }
                      
                       @After
                       public void stop ()
                       {
                       table.unsetCache ();
                       cache.stop();
                       }
                      
                       private Cache initPojoCache()
                       {
                       CacheFactory factory = new DefaultCacheFactory();
                       Cache cache = factory.createCache("resources/META-INF/replSync-service.xml", false);
                      
                       TransactionManager tm = new com.atomikos.icatch.jta.UserTransactionManager ();
                       cache.getConfiguration().getRuntimeConfig().setTransactionManager(tm);
                      
                       cache.create();
                       return cache;
                       }
                      }
                      


                      The service implementation:
                      package hellotrackworld.impl;
                      
                      import javax.transaction.UserTransaction;
                      
                      import hellotrackworld.TracksTable;
                      
                      import org.jboss.cache.Cache;
                      
                      
                      public class TracksTableImpl implements TracksTable
                      {
                       private Cache cache;
                       private int scalar;
                      
                       @Override
                       public void incrementScalar()
                       {
                       try
                       {
                       UserTransaction tx = getTx ();
                       tx.begin();
                      
                       cache.put("/foo/mynode", "_lockthisplease_", "_lockthisplease_");
                       cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
                       int tmp = (Integer)cache.get("/foo/mynode", "scalar");
                       tmp++;
                       cache.put ( "/foo/mynode" , "scalar" , tmp );
                      
                       tx.commit();
                       }
                       catch ( Exception e )
                       {
                       e.printStackTrace();
                       throw new RuntimeException ();
                       }
                       }
                      
                       @Override
                       public int getScalar()
                       {
                       try
                       {
                       UserTransaction tx = getTx ();
                       tx.begin();
                       int ret = (Integer)cache.get("/foo/mynode", "scalar");
                       tx.commit();
                       return ret;
                       }
                       catch ( Exception e )
                       {
                       e.printStackTrace();
                       throw new RuntimeException ();
                       }
                       }
                      
                       @Override
                       public void initScalar()
                       {
                       try
                       {
                       UserTransaction tx = getTx ();
                       tx.begin();
                       cache.put ( "/foo/mynode" , "scalar" , 0 );
                       tx.commit();
                       }
                       catch ( Exception e )
                       {
                       e.printStackTrace();
                       throw new RuntimeException ();
                       }
                       }
                      
                       @Override
                       public void setCache ( Cache cache )
                       {
                       this.cache = cache;
                       }
                      
                       @Override
                       public void unsetCache ()
                       {
                       this.cache = null;
                       }
                      
                       private UserTransaction getTx()
                       {
                       return new com.atomikos.icatch.jta.UserTransactionImp();
                       }
                      }
                      


                      The replSync-service:
                      <?xml version="1.0" encoding="UTF-8"?>
                      <server>
                       <mbean code="org.jboss.cache.jmx.CacheJmxWrapper" name="jboss.cache:service=TreeCache">
                      
                       <!--<attribute name="NodeLockingScheme">PESSIMISTIC</attribute>
                       <attribute name="IsolationLevel">SERIALIZABLE</attribute>-->
                       <attribute name="CacheMode">LOCAL</attribute>
                       <attribute name="UseReplQueue">false</attribute>
                       <attribute name="ReplQueueInterval">0</attribute>
                       <attribute name="ReplQueueMaxElements">0</attribute>
                       <attribute name="ClusterName">JBossCache-Cluster</attribute>
                      
                       <attribute name="ClusterConfig">
                       <config>
                       <UDP mcast_addr="228.10.10.10"
                       mcast_port="45588"
                       tos="8"
                       ucast_recv_buf_size="20000000"
                       ucast_send_buf_size="640000"
                       mcast_recv_buf_size="25000000"
                       mcast_send_buf_size="640000"
                       loopback="false"
                       discard_incompatible_packets="true"
                       max_bundle_size="64000"
                       max_bundle_timeout="30"
                       use_incoming_packet_handler="true"
                       ip_ttl="2"
                       enable_bundling="false"
                       enable_diagnostics="true"
                      
                       use_concurrent_stack="true"
                      
                       thread_naming_pattern="pl"
                      
                       thread_pool.enabled="true"
                       thread_pool.min_threads="1"
                       thread_pool.max_threads="25"
                       thread_pool.keep_alive_time="30000"
                       thread_pool.queue_enabled="true"
                       thread_pool.queue_max_size="10"
                       thread_pool.rejection_policy="Run"
                      
                       oob_thread_pool.enabled="true"
                       oob_thread_pool.min_threads="1"
                       oob_thread_pool.max_threads="4"
                       oob_thread_pool.keep_alive_time="10000"
                       oob_thread_pool.queue_enabled="true"
                       oob_thread_pool.queue_max_size="10"
                       oob_thread_pool.rejection_policy="Run"/>
                      
                       <PING timeout="2000" num_initial_members="3"/>
                       <MERGE2 max_interval="30000" min_interval="10000"/>
                       <FD_SOCK/>
                       <FD timeout="10000" max_tries="5" shun="true"/>
                       <VERIFY_SUSPECT timeout="1500"/>
                       <pbcast.NAKACK max_xmit_size="60000"
                       use_mcast_xmit="false" gc_lag="0"
                       retransmit_timeout="300,600,1200,2400,4800"
                       discard_delivered_msgs="true"/>
                       <UNICAST timeout="300,600,1200,2400,3600"/>
                       <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                       max_bytes="400000"/>
                       <pbcast.GMS print_local_addr="true" join_timeout="5000"
                       join_retry_timeout="2000" shun="false"
                       view_bundling="true" view_ack_collection_timeout="5000"/>
                       <FRAG2 frag_size="60000"/>
                       <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/>
                       <!-- <pbcast.STATE_TRANSFER/> -->
                       <pbcast.FLUSH timeout="0"/>
                       </config>
                       </attribute>
                      
                      
                       <attribute name="FetchInMemoryState">true</attribute>
                       <attribute name="StateRetrievalTimeout">15000</attribute>
                       <attribute name="SyncReplTimeout">15000</attribute>
                       <attribute name="LockAcquisitionTimeout">10000</attribute>
                       <attribute name="UseRegionBasedMarshalling">true</attribute>
                       </mbean>
                      </server>
                      


                      With this configuration the outcome is identical: passes with pessimistic locking, fails otherwise (incorrect scalar) even using setForceWriteLock and/or actually writing to the node.

                      If i add the option
                      cache.getConfiguration().setWriteSkewCheck(true);
                      , the write fails with
                      org.jboss.cache.optimistic.DataVersioningException: Detected write skew on Fqn [/foo/mynode]. Another process has changed the node since we last read it!
                      like it did in the test that uses Spring.
                      JBC detects a write skew, so the writes were not Serialized.



                      • 23. Re: Transactions: atomicity OK, isolation KO

                        I can't edit my post but the spring import in the test refers to the private UNUSED attribute ApplicationContext.

                        • 24. Re: Transactions: atomicity OK, isolation KO

                          No ideas? Is there no way to serialize the writes in MVCC?

                          • 25. Re: Transactions: atomicity OK, isolation KO
                            jason.greene

                             

                            private UserTransaction getTx()
                             {
                             return new com.atomikos.icatch.jta.UserTransactionImp();
                             }
                            


                            You have to use the same transaction manager that JBoss Cache is configured to use. This is probably your problem.

                            • 26. Re: Transactions: atomicity OK, isolation KO

                              Am i not already using "the same TM that JBC is using"?

                              I create the cache with the following:

                              private Cache initPojoCache()
                               {
                               CacheFactory factory = new DefaultCacheFactory();
                               Cache cache = factory.createCache("resources/META-INF/replSync-service2.xml", false);
                              
                               TransactionManager tm = new com.atomikos.icatch.jta.UserTransactionManager ();
                               cache.getConfiguration().getRuntimeConfig().setTransactionManager(tm);
                              
                               cache.getConfiguration().setWriteSkewCheck(true);
                              
                               cache.create();
                               return cache;
                               }


                              And i create a new transaction with:
                              return new com.atomikos.icatch.jta.UserTransactionImp();


                              So it looks to me i'm using the same TM.
                              But indeed maybe a good test would be using JBC's default TM. How do i do that (specifically creating a new UserTransaction)? I can't see anything to that effect in the doc.

                              By the way i now use the new JBC config (here with everything default):
                              <?xml version="1.0" encoding="UTF-8"?>
                              
                              <jbosscache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:jboss:jbosscache-core:config:3.2">
                              
                               <!--
                               isolation levels supported: READ_COMMITTED and REPEATABLE_READ
                               nodeLockingSchemes: mvcc, pessimistic (deprecated), optimistic (deprecated)
                               -->
                               <!--<locking
                               isolationLevel="SERIALIZABLE"
                               lockParentForChildInsertRemove="false"
                               lockAcquisitionTimeout="20000"
                               nodeLockingScheme="pessimistic"
                               writeSkewCheck="false"
                               concurrencyLevel="500"/>-->
                              
                               <!-- By not specifying the 'clustering' element, the cache runs in LOCAL mode. -->
                              
                              </jbosscache>
                              

                              and the behaviour is identical.

                              • 27. Re: Transactions: atomicity OK, isolation KO

                                Seriously, how do i instantiate a new UserTransaction with JBC's local transactions? What is the equivalent of

                                return new com.atomikos.icatch.jta.UserTransactionImp();

                                I tried
                                new DummyUserTransaction (new TransactionManager())
                                but it looks suspicious.

                                So far with everything i tried i'm still not convinced that it is possible to serialize operations with the MVCC locking scheme.

                                • 28. Re: Transactions: atomicity OK, isolation KO
                                  manik

                                  You need to look at how Atomikos binds a user transaction to JNDI. And based on this, you retrieve the UserTransaction and call begin().

                                  Have a look at Sun's JavaEE tutorial on JTA: http://java.sun.com/j2ee/tutorial/1_3-fcs/doc/Transaction4.html

                                  • 29. Re: Transactions: atomicity OK, isolation KO

                                    I don't get it why would i need to use JNDI? My test passes without JNDI, using Atomikos and the pessimistic locking scheme.
                                    I don't need JNDI to start and commit a transaction in the same method, like i do:

                                     UserTransaction tx = getTx ();
                                     tx.begin();
                                    
                                     cache.put("/foo/mynode", "_lockthisplease_", "_lockthisplease_");
                                     cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
                                     int tmp = (Integer)cache.get("/foo/mynode", "scalar");
                                     tmp++;
                                     cache.put ( "/foo/mynode" , "scalar" , tmp );
                                    
                                     tx.commit();
                                     }
                                    
                                     [...]
                                    
                                     private UserTransaction getTx()
                                     {
                                     return new com.atomikos.icatch.jta.UserTransactionImp();
                                     //return new DummyUserTransaction ( new DummyTransactionManager () );
                                     }
                                    }
                                    


                                    I also set JBC's TM at startup:
                                    TransactionManager tm = new com.atomikos.icatch.jta.UserTransactionManager ();
                                     cache.getConfiguration().getRuntimeConfig().setTransactionManager(tm);



                                    The idea was to test whether i have the same failure using JBC's default transactions.
                                    If that's meaningless (in an analogy with JDBC local transactions, i should call commit on a JBC resource) then what else can i test?

                                    This test is pretty simple, it's just a concurrent write. There should be a way to test if it works with mvcc right?
                                    Can you confirm if it's really possible to make the test pass with mvcc?? I'm starting to think it's impossible.