6 Replies Latest reply on Dec 23, 2009 2:08 PM by Brian Stansberry

    Current transation can not see modifications done by committ

    Alexey Veklov Newbie

      Current transation can not see modifications done by committed transactions while isolation level is READ_COMMITTED.

      Cache version is 3.2.1.GA. Can be replicated when cache mode is REPL_SYNC or INVALIDATION_SYNC (the other are not tested). Please see below test cases for details.

      Note: Test cases use DummyTransactionManager and behavior of second test case for REPL_SYNC mode depends on whether calls are done in context of JTA transaction or not, so in this mode the issue can be caused by a bug in DummyTransactionManager.

      INVALIDATION_SYNC:

      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.List;
      import java.util.concurrent.CyclicBarrier;
      import java.util.concurrent.TimeUnit;
      import javax.transaction.TransactionManager;
      
      import junit.framework.TestCase;
      import org.jboss.cache.Cache;
      import org.jboss.cache.CacheFactory;
      import org.jboss.cache.CacheSPI;
      import org.jboss.cache.DefaultCacheFactory;
      import org.jboss.cache.config.Configuration;
      import org.jboss.cache.lock.IsolationLevel;
      
      public class TestInvalidationSyncReadCommited extends TestCase {
      
       private static final String NODE_FQN = "/node";
       private static final String KEY = "key";
       private static final String VALUE1 = "value1";
       private static final String VALUE2 = "value2";
      
       private static Cache createCache() {
       final CacheFactory cf = new DefaultCacheFactory();
       final Configuration configuration = new Configuration();
       configuration.setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
       configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
       configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
       configuration.setLockAcquisitionTimeout(10000L);
       return cf.createCache(configuration, true);
       }
      
       public void testPut() throws Exception {
       final Cache cache1 = createCache();
       final Cache cache2 = createCache();
       assertEquals("Members count", 2, cache1.getMembers().size());
       assertEquals("Members count", 2, cache2.getMembers().size());
      
       final CyclicBarrier barrier = new CyclicBarrier(2);
       final List exceptions = Collections.synchronizedList(new ArrayList());
      
       cache1.put(NODE_FQN, KEY, VALUE1);
      
       final Thread thread1 = new Thread() {
       public void run() {
       try {
       getTransactionManager(cache1).begin();
       assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
       await(barrier);
       await(barrier);
      // assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
       assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
       getTransactionManager(cache1).commit();
       assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
       final Thread thread2 = new Thread() {
       public void run() {
       try {
       await(barrier);
       getTransactionManager(cache2).begin();
       cache2.put(NODE_FQN, KEY, VALUE2);
       getTransactionManager(cache2).commit();
       await(barrier);
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
      
       thread1.start();
       thread2.start();
       thread1.join();
       thread2.join();
      
       cache1.stop();
       cache2.stop();
      
       if (!exceptions.isEmpty()) {
       fail(exceptions.toString());
       }
       }
      
       public void testRemoveNode() throws Exception {
       final Cache cache1 = createCache();
       final Cache cache2 = createCache();
       assertEquals("Members count", 2, cache1.getMembers().size());
       assertEquals("Members count", 2, cache2.getMembers().size());
      
       final CyclicBarrier barrier = new CyclicBarrier(2);
       final List exceptions = Collections.synchronizedList(new ArrayList());
      
       cache1.put(NODE_FQN, KEY, VALUE1);
      
       final Thread thread1 = new Thread() {
       public void run() {
       try {
       getTransactionManager(cache1).begin();
       assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
       await(barrier);
       await(barrier);
      // assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
       assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
       getTransactionManager(cache1).commit();
       assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
       final Thread thread2 = new Thread() {
       public void run() {
       try {
       await(barrier);
       getTransactionManager(cache2).begin();
       cache2.removeNode(NODE_FQN);
       getTransactionManager(cache2).commit();
       await(barrier);
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
      
       thread1.start();
       thread2.start();
       thread1.join();
       thread2.join();
      
       cache1.stop();
       cache2.stop();
      
       if (!exceptions.isEmpty()) {
       fail(exceptions.toString());
       }
       }
      
       private static TransactionManager getTransactionManager(final Cache cache) {
       return ((CacheSPI) cache).getTransactionManager();
       }
      
       private static void await(final CyclicBarrier barrier) throws Exception {
       barrier.await(20, TimeUnit.SECONDS);
       }
      }
      


      REPL_SYNC (This case can be fixed by commenting out begin/commit calls in thread2, e.g. behavior depends on whether calls are done in context of JTA transaction or not. Is that an issue with DummyTransactionManager or with cache itself?):
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.List;
      import java.util.concurrent.BrokenBarrierException;
      import java.util.concurrent.CyclicBarrier;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.TimeoutException;
      import javax.transaction.TransactionManager;
      
      import junit.framework.TestCase;
      import org.jboss.cache.Cache;
      import org.jboss.cache.CacheFactory;
      import org.jboss.cache.CacheSPI;
      import org.jboss.cache.DefaultCacheFactory;
      import org.jboss.cache.config.Configuration;
      import org.jboss.cache.lock.IsolationLevel;
      
      public class TestReplSyncReadCommitted2 extends TestCase {
      
       private static final String NODE_FQN = "/node";
       private static final String KEY = "key";
       private static final String VALUE1 = "value1";
       private static final String VALUE2 = "value2";
      
       private static Cache createCache() {
       final CacheFactory cf = new DefaultCacheFactory();
       final Configuration configuration = new Configuration();
       configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
       configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
       configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
       configuration.setLockAcquisitionTimeout(10000L);
       return cf.createCache(configuration, true);
       }
      
       public void testRemoveNodeTwoCaches() throws InterruptedException {
       final Cache cache1 = createCache();
       final Cache cache2 = createCache();
       assertEquals("Members count", 2, cache1.getMembers().size());
       assertEquals("Members count", 2, cache2.getMembers().size());
      
       final CyclicBarrier barrier = new CyclicBarrier(2);
       final List exceptions = Collections.synchronizedList(new ArrayList());
      
       final Thread thread1 = new Thread() {
       public void run() {
       try {
       await(barrier);
       await(barrier);
       getTransactionManager(cache1).begin();
       assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
       await(barrier);
       await(barrier);
       assertNull("Must be removed", cache1.get(NODE_FQN, KEY));
       getTransactionManager(cache1).commit();
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
       final Thread thread2 = new Thread() {
       public void run() {
       try {
       await(barrier);
       getTransactionManager(cache2).begin();
       cache2.put(NODE_FQN, KEY, VALUE1);
       getTransactionManager(cache2).commit();
       await(barrier);
       await(barrier);
       getTransactionManager(cache2).begin();
       cache2.removeNode(NODE_FQN);
       getTransactionManager(cache2).commit();
       await(barrier);
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
      
       thread1.start();
       thread2.start();
       thread1.join();
       thread2.join();
      
       cache1.stop();
       cache2.stop();
      
       if (!exceptions.isEmpty()) {
       fail(exceptions.toString());
       }
       }
      
       public void testPutTwoCaches() throws InterruptedException {
       final Cache cache1 = createCache();
       final Cache cache2 = createCache();
       assertEquals("Members count", 2, cache1.getMembers().size());
       assertEquals("Members count", 2, cache2.getMembers().size());
      
       final CyclicBarrier barrier = new CyclicBarrier(2);
       final List exceptions = Collections.synchronizedList(new ArrayList());
      
       final Thread thread1 = new Thread() {
       public void run() {
       try {
       await(barrier);
       await(barrier);
       getTransactionManager(cache1).begin();
       assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
       await(barrier);
       await(barrier);
       assertEquals("Must be replicated", VALUE2, cache1.get(NODE_FQN, KEY));
       getTransactionManager(cache1).commit();
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
       final Thread thread2 = new Thread() {
       public void run() {
       try {
       await(barrier);
       getTransactionManager(cache2).begin();
       cache2.put(NODE_FQN, KEY, VALUE1);
       getTransactionManager(cache2).commit();
       await(barrier);
       await(barrier);
       getTransactionManager(cache2).begin();
       cache2.put(NODE_FQN, KEY, VALUE2);
       getTransactionManager(cache2).commit();
       await(barrier);
       } catch (Throwable e) {
       exceptions.add(e);
       }
       }
       };
      
       thread1.start();
       thread2.start();
       thread1.join();
       thread2.join();
      
       cache1.stop();
       cache2.stop();
      
       if (!exceptions.isEmpty()) {
       fail(exceptions.toString());
       }
       }
      
       private static TransactionManager getTransactionManager(final Cache cache) {
       return ((CacheSPI) cache).getTransactionManager();
       }
      
       private static void await(final CyclicBarrier barrier)
       throws InterruptedException, BrokenBarrierException, TimeoutException {
       barrier.await(20, TimeUnit.SECONDS);
       }
      }
      


        • 1. Re: Current transation can not see modifications done by com
          Alexey Veklov Newbie

          Could someone from JBoss team comment on this topic please?

          Is that correct behaviour in READ_COMMITED mode?
          Is that incorrect cache usage?
          Is that an issue with test (for example with DummyTrandactionManager)?
          Is that an issue with cache itself?

          Thanks!

          • 2. Re: Current transation can not see modifications done by committ
            Alexey Veklov Newbie

            Hello,

             

            It looks like changes are not propagated to the other nodes of cluster if tx are managed explicitly (REPL_SYNC, READ_COMMITTED). Here is very simple test case which shows this:

            {code:java}

            import java.util.ArrayList;

            import java.util.Collections;

            import java.util.List;

            import java.util.concurrent.BrokenBarrierException;

            import java.util.concurrent.CyclicBarrier;

            import java.util.concurrent.TimeUnit;

            import java.util.concurrent.TimeoutException;

            import javax.transaction.TransactionManager;

             

            import junit.framework.TestCase;

            import org.jboss.cache.Cache;

            import org.jboss.cache.CacheFactory;

            import org.jboss.cache.CacheSPI;

            import org.jboss.cache.DefaultCacheFactory;

            import org.jboss.cache.config.Configuration;

            import org.jboss.cache.lock.IsolationLevel;

             

            public class TestReplSyncReadCommitted3 extends TestCase {

             

                private static final String NODE_FQN = "/node";

                private static final String KEY = "key";

                private static final String VALUE1 = "value1";

             

                private Cache cache1;

                private Cache cache2;

                private CyclicBarrier barrier;

                private List exceptions;

                private Thread thread1;

                private Thread thread2;

             

                public void testWithTx() throws InterruptedException {

                    thread1 = new Thread() {

                        public void run() {

                            try {

                                await(barrier);

                                await(barrier);

                                assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));

                                await(barrier);

                            } catch (Throwable e) {

                                exceptions.add(e);

                            }

                        }

                    };

                    thread2 = new Thread() {

                        public void run() {

                            try {

                                await(barrier);

                                getTransactionManager(cache2).begin();

                                cache2.put(NODE_FQN, KEY, VALUE1);

                                getTransactionManager(cache2).commit();

                                await(barrier);

                                await(barrier);

                            } catch (Throwable e) {

                                exceptions.add(e);

                            }

                        }

                    };

                    runAndAssertNoExceptions();

                }

             

                public void testWithoutTx() throws InterruptedException {

                    thread1 = new Thread() {

                        public void run() {

                            try {

                                await(barrier);

                                await(barrier);

                                assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));

                                await(barrier);

                            } catch (Throwable e) {

                                exceptions.add(e);

                            }

                        }

                    };

                    thread2 = new Thread() {

                        public void run() {

                            try {

                                await(barrier);

                                cache2.put(NODE_FQN, KEY, VALUE1);

                                await(barrier);

                                await(barrier);

                            } catch (Throwable e) {

                                exceptions.add(e);

                            }

                        }

                    };

                    runAndAssertNoExceptions();

                }

             

                private void runAndAssertNoExceptions() throws InterruptedException {

                    thread1.start();

                    thread2.start();

                    thread1.join();

                    thread2.join();

             

                    if (!exceptions.isEmpty()) {

                        fail(exceptions.toString());

                    }

                }

             

                protected void setUp() {

                    cache1 = createCache();

                    cache2 = createCache();

                    barrier = new CyclicBarrier(2);

                    exceptions = Collections.synchronizedList(new ArrayList());

                }

             

                protected void tearDown() {

                    cache1.stop();

                    cache2.stop();

                }

             

                private static Cache createCache() {

                    final CacheFactory cf = new DefaultCacheFactory();

                    final Configuration configuration = new Configuration();

                    configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);

                    configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");

                    configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);

                    configuration.setLockAcquisitionTimeout(10000L);

                    return cf.createCache(configuration, true);

                }

             

                private static TransactionManager getTransactionManager(final Cache cache) {

                    return ((CacheSPI) cache).getTransactionManager();

                }


                private static void await(final CyclicBarrier barrier)

                        throws InterruptedException, BrokenBarrierException, TimeoutException {

                    barrier.await(20, TimeUnit.SECONDS);

                }

            }

            {code}

            • 3. Re: Current transation can not see modifications done by committ
              Brian Stansberry Master

              Try adding to createCache():

               

              configuration.setSyncCommitPhase(true);
              configuration.setSyncRollbackPhase(true);

               

              Without that, the thread committing the tx doesn't block waiting for responses to the 2nd message in the 2-phase-commit protocol. Effect is your thread1 could do the get before that message arrives at cache1 and is processed. Until it arrives and it processed, the write is not visible.

              • 4. Re: Current transation can not see modifications done by committ
                Alexey Veklov Newbie

                Thank you very much for your answer! That really fixed two of three my test cases.

                Could you also take a look on the remaining test case?

                Both test methods fail with "junit.framework.AssertionFailedError: Must be invalidated before commit".

                {code:java}

                import java.util.ArrayList;

                import java.util.Collections;

                import java.util.List;

                import java.util.concurrent.CyclicBarrier;

                import java.util.concurrent.TimeUnit;

                import javax.transaction.TransactionManager;

                 

                import junit.framework.TestCase;

                import org.jboss.cache.Cache;

                import org.jboss.cache.CacheFactory;

                import org.jboss.cache.CacheSPI;

                import org.jboss.cache.DefaultCacheFactory;

                import org.jboss.cache.config.Configuration;

                import org.jboss.cache.lock.IsolationLevel;

                 

                public class TestInvalidationSyncReadCommitted extends TestCase {

                 

                    private static final String NODE_FQN = "/node";

                    private static final String KEY = "key";

                    private static final String VALUE1 = "value1";

                    private static final String VALUE2 = "value2";

                 

                    private Cache cache1;

                    private Cache cache2;

                    private CyclicBarrier barrier;

                    private List exceptions;

                 

                    protected void setUp() throws Exception {

                        cache1 = createCache();

                        cache2 = createCache();

                 

                        barrier = new CyclicBarrier(2);

                        exceptions = Collections.synchronizedList(new ArrayList());

                    }

                 

                    protected void tearDown() throws Exception {

                        cache1.stop();

                        cache2.stop();

                    }

                 

                    public void testPut() throws Exception {

                        assertEquals("Members count", 2, cache1.getMembers().size());

                        assertEquals("Members count", 2, cache2.getMembers().size());

                 

                        cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);

                        cache1.put(NODE_FQN, KEY, VALUE1);

                        cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);

                        cache2.put(NODE_FQN, KEY, VALUE1);

                 

                        final Thread thread1 = new Thread() {

                            public void run() {

                                try {

                                    getTransactionManager(cache1).begin();

                                    assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));

                                    await(barrier);

                                    await(barrier);

                //                    assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));

                                    assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));

                                    getTransactionManager(cache1).commit();

                                    assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));

                                } catch (Throwable e) {

                                    exceptions.add(e);

                                }

                            }

                        };

                        final Thread thread2 = new Thread() {

                            public void run() {

                                try {

                                    await(barrier);

                                    getTransactionManager(cache2).begin();

                                    cache2.put(NODE_FQN, KEY, VALUE2);

                                    getTransactionManager(cache2).commit();

                                    await(barrier);

                                } catch (Throwable e) {

                                    exceptions.add(e);

                                }

                            }

                        };

                 

                        runAndAssertResults(thread1, thread2);

                    }

                 

                    public void testRemoveNode() throws Exception {

                        assertEquals("Members count", 2, cache1.getMembers().size());

                        assertEquals("Members count", 2, cache2.getMembers().size());

                 

                        cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);

                        cache1.put(NODE_FQN, KEY, VALUE1);

                        cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);

                        cache2.put(NODE_FQN, KEY, VALUE1);

                 

                        final Thread thread1 = new Thread() {

                            public void run() {

                                try {

                                    getTransactionManager(cache1).begin();

                                    assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));

                                    await(barrier);

                                    await(barrier);

                //                    assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));

                                    assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));

                                    getTransactionManager(cache1).commit();

                                    assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));

                                } catch (Throwable e) {

                                    exceptions.add(e);

                                }

                            }

                        };

                        final Thread thread2 = new Thread() {

                            public void run() {

                                try {

                                    await(barrier);

                                    getTransactionManager(cache2).begin();

                                    cache2.removeNode(NODE_FQN);

                                    getTransactionManager(cache2).commit();

                                    await(barrier);

                                } catch (Throwable e) {

                                    exceptions.add(e);

                                }

                            }

                        };

                 

                        runAndAssertResults(thread1, thread2);

                    }

                 

                    private static Cache createCache() {

                        final CacheFactory cf = new DefaultCacheFactory();

                        final Configuration configuration = new Configuration();

                        configuration.setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);

                        configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");

                        configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);

                        configuration.setLockAcquisitionTimeout(10000L);

                        configuration.setSyncCommitPhase(true);

                        configuration.setSyncRollbackPhase(true);

                        return cf.createCache(configuration, true);

                    }

                 

                    private void runAndAssertResults(final Thread thread1, final Thread thread2) throws InterruptedException {

                        thread1.start();

                        thread2.start();

                        thread1.join();

                        thread2.join();

                 

                        if (!exceptions.isEmpty()) {

                            fail(exceptions.toString());

                        }

                    }

                 

                    private static TransactionManager getTransactionManager(final Cache cache) {

                        return ((CacheSPI) cache).getTransactionManager();

                    }

                 

                    private static void await(final CyclicBarrier barrier) throws Exception {

                        barrier.await(20, TimeUnit.SECONDS);

                    }

                }

                {code}

                • 5. Re: Current transation can not see modifications done by committ
                  Brian Stansberry Master

                  I think I may know the reason; I'll take you through what my look at the code is telling me:

                   

                  The get() call is doing this in class GetKeyValueCommand:

                   

                     public Object perform(InvocationContext ctx)
                     {
                        NodeSPI n = ctx.lookUpNode(fqn);
                        if (n == null)
                        {
                           if (trace) log.trace("Node not found");
                           return null;
                        }
                        if (n.isDeleted())
                        {
                           if (trace) log.trace("Node has been deleted and is of type " + n.getClass().getSimpleName());
                           return null;
                        }
                        if (sendNodeEvent) notifier.notifyNodeVisited(fqn, true, ctx);
                        Object result = n.getDirect(key);
                        if (trace) log.trace("Found value " + result);
                        if (sendNodeEvent) notifier.notifyNodeVisited(fqn, false, ctx);
                        return result;
                     }
                  

                   

                  The ctx.lookupNode() call is retrieving the node from a map associated with the tx. Since the node was previously read in the tx, this call will return the node even if another tx removed it from the cache's main data container. The n.isDeleted() guard is meant to protect against using such a node.

                   

                  But, what happens when an invalidation message comes in from another cluster member is this in InvalidateCommand:

                   

                     public Object perform(InvocationContext ctx)
                     {
                        NodeSPI node = enforceNodeLoading();
                        if (trace) log.trace("Invalidating fqn:" + fqn);
                        if (node == null)
                        {
                           return null;
                        }
                        evictNode(fqn, ctx);
                        invalidateNode(node);
                        return null;
                     }
                  
                     boolean evictNode(Fqn fqn, InvocationContext ctx)
                     {
                        notifier.notifyNodeInvalidated(fqn, true, ctx);
                        try
                        {
                           return dataContainer.evict(fqn);
                        }
                        finally
                        {
                           notifier.notifyNodeInvalidated(fqn, false, ctx);
                        }
                     }
                  
                     ....
                  
                  
                     /**
                      * mark the node to be removed (and all children) as invalid so anyone holding a direct reference to it will
                      * be aware that it is no longer valid.
                      */
                     protected void invalidateNode(NodeSPI node)
                     {
                        node.setValid(false, true);
                        // root nodes can never be invalid
                        if (fqn.isRoot()) node.setValid(true, false); // non-recursive.
                     }
                  

                   

                  Following through the logic in the perform() method I don't see anywhere where the node is being marked as deleted. (You have to go into DataContainerImpl.evict(Fqn) to follow the trail, and it gets hairy so maybe I missed it.)The node is marked as being invalid, but GetKeyValueCommand.perform() is not checking n.isValid(), just n.isDeleted().

                   

                  Anyway, this seems like a bug, and you've written a nice test case, so I'll open a JIRA.