11 Replies Latest reply on Apr 16, 2014 1:17 PM by Nicolas Filotto

    What is the best way to concurrently invalidate an entry in a replicated transactional cache?

    Nicolas Filotto Novice

      Hi everybody,

       

      I'm facing with ISPN 5.2.7.Final a well known use case that causes deadlocks when we use ISPN and/or JBC as replicated transactional cache.

      The use case is the following: We have 2 concurrent threads that try to modify the same entry in 2 different cluster node.

      In other words:

      • The thread #1 acquires the lock on the entry locally on the cluster node #1
      • The thread #2 acquires the lock on the same entry locally on the cluster node #2

      Then:

      • The "thread #1 tries to acquire remotely" the lock on the same entry on the cluster node #2
      • The "thread #2 tries to acquire remotely" the lock on the same entry on the cluster node #1

      => Deadlock as the entry has already been locked => TimeoutException that causes a revert on both nodes such that the data remains consistent over the cluster

       

      Most of the time this deadlock is a good thing as it ensures consistency over the cluster but unfortunately I have a specific use case where this could happen very often so I would like to know if there is a workaround allowing to avoid this deadlock even if it means adding the risk to get some consistency issues.

       

      In my use case, I need to invalidate an entry regularly, and I would like to be able to do it concurrently over the cluster with a limited risk of getting deadlocks.

      So far, I tried several things such as doing my invalidation:

      1. using cache.remove(key) outside my transaction in auto commit mode with and without use1PcForAutoCommitTransactions enabled
      2. using cache.removeAsync(key) inside and outside my transaction
      3. using cache.withFlags(Flag.FORCE_ASYNCHRONOUS).remove(key) inside and outside my transaction

       

      None of them worked, the only thing that works, is a very ugly workaround which is something like:

               

      // Remove the entry locally
      cache.withFlags(Flag.CACHE_MODE_LOCAL).remove(key);
      // Invalidate the entry on the other cluster nodes
      InvalidateCommand invCmd = cache.getComponentRegistry().getCommandsFactory().buildInvalidateCommand(Collections.<Flag>emptySet(), key);
      cache.getAdvancedCache().getRpcManager().broadcastRpcCommand(invCmd, true);
      

       

      Do you have a better approach? If not do you have any idea of what kind of issues I could face with my ugly workaround?

       

      Thank you in advance for your help,

      BR,

      Nicolas

        • 1. Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
          Radim Vansa Master

          The locking mechanism has changed in 5.3 (the lock is always acquired only on the primary owner). Upgrade to 6.0, and no deadlocks should happen any more.

          1 of 1 people found this helpful
          • 2. Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
            Nicolas Filotto Novice

            I've just checked and indeed I confirm that it works perfectly without any workaround on 5.3 and 6.0. Unfortunately I have to use the version shipped into EAP 6.3 which is ISPN 5.2.x :-(

             

            BTW what is the "primary owner" in case of a replicated cache? I understand the meaning in case of a distributed cache but it is not clear in case of a replicated cache

            • 3. Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
              Radim Vansa Master

              It's the same as in distribution mode - the replicated consistent hash contains primary owner for each segment, the rest of nodes are backup owners. However, this has changed some time ago (not sure whether it was already in 5.2 or only in 5.3) as previously all locks were acquire only on the coordinator in replicated mode.

              • 4. Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
                Mircea Markus Master

                @Radim - locking delegation has been implemented in Infinispan 5.2[1] and it has to do with non-transactional caches only. From what I understand from Nicolas, the issue he has is for transactional caches.

                @Nicolas, I'm quite surprised you have this issue in ISPN 5.2 as same-key deadlocks shouldn't be possible in this version. Can you please upload the Infinispan logs somewhere and I will take a look at what's going on? Also what's the exact configuration you're using?

                 

                [1] [ISPN-2552] Support concurrent updates for non-transactional caches - JBoss Issue Tracker

                • 5. Re: Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
                  Nicolas Filotto Novice

                  To reproduce, I launch 2 JVM with -Djava.net.preferIPv4Stack=true -Djgroups.bind_addr=127.0.0.1 as System properties, they both execute the next class:

                   

                  import org.infinispan.AdvancedCache;
                  import org.infinispan.Cache;
                  import org.infinispan.configuration.cache.CacheMode;
                  import org.infinispan.configuration.cache.Configuration;
                  import org.infinispan.configuration.cache.ConfigurationBuilder;
                  import org.infinispan.configuration.global.GlobalConfigurationBuilder;
                  import org.infinispan.manager.DefaultCacheManager;
                  import org.infinispan.manager.EmbeddedCacheManager;
                  import org.infinispan.transaction.TransactionMode;
                  import org.infinispan.transaction.lookup.TransactionManagerLookup;
                  
                  import java.util.Random;
                  import java.util.concurrent.CountDownLatch;
                  import java.util.concurrent.atomic.AtomicBoolean;
                  import java.util.concurrent.atomic.AtomicInteger;
                  
                  import javax.transaction.TransactionManager;
                  
                  public class PutNRemoveTest
                  {
                    private static final int MAX_ITERATIONS = 20000;
                    private static final int TOTAL_THREAD = 20;
                    private static final String PARENT_ID = "$PARENT-ID";
                    private static final String ID = Long.toString(new Random().nextLong());
                    private static AtomicInteger COUNT = new AtomicInteger();
                  
                    private static void putNRemove(AtomicBoolean stop, AdvancedCache<String, String> cache) throws Exception
                    {
                        int iteration = COUNT.incrementAndGet();
                        if (iteration >= MAX_ITERATIONS)
                        {
                          stop.compareAndSet(false, true);
                          return;
                        }
                        final TransactionManager tm = cache.getAdvancedCache().getTransactionManager();
                        String id = ID + "-" + System.currentTimeMillis() + "-" + iteration;
                        try
                        {
                          tm.begin();
                          cache.put(id, "foo");
                          cache.remove(PARENT_ID);
                          tm.commit();
                          System.out.println("Entry '" + id + "' has been added");
                  
                          tm.begin();
                          cache.remove(id);
                          cache.remove(PARENT_ID);
                          tm.commit();
                          System.out.println("Entry '" + id + "' has been removed");
                        }
                        catch (Exception e)
                        {
                          System.err.println("Could not add/remove the entry '" + id + "' due to the error: " + e.getMessage());
                          e.printStackTrace(System.err);
                        }
                    }
                  
                    public static void main(String[] args) throws Exception
                    {
                        GlobalConfigurationBuilder configBuilder = new GlobalConfigurationBuilder();
                        configBuilder.transport().defaultTransport().addProperty("configurationFile", "udp.xml");
                        EmbeddedCacheManager manager = new DefaultCacheManager(configBuilder.build());
                        ConfigurationBuilder confBuilder = new ConfigurationBuilder();
                        confBuilder.clustering().cacheMode(CacheMode.REPL_SYNC).stateTransfer().fetchInMemoryState(true);
                        final TransactionManager tm = com.arjuna.ats.jta.TransactionManager.transactionManager();
                        TransactionManagerLookup tml = new TransactionManagerLookup()
                        {
                          public TransactionManager getTransactionManager() throws Exception
                          {
                              return tm;
                          }
                        };
                        confBuilder
                          .transaction()
                          .syncRollbackPhase(true)
                          .transactionManagerLookup(tml).transactionMode(TransactionMode.TRANSACTIONAL);
                      
                        Configuration conf = confBuilder.build();
                        manager.defineConfiguration("test", conf);
                        Cache<String, String> c = manager.getCache("test");
                        c.start();
                        final AdvancedCache<String, String> cache = c.getAdvancedCache();
                        final AtomicBoolean stop = new AtomicBoolean();
                        final CountDownLatch startSignal = new CountDownLatch(1);
                        final CountDownLatch endSignal = new CountDownLatch(TOTAL_THREAD);
                        Runnable task = new Runnable()
                        {
                          public void run()
                          {
                              try
                              {
                                startSignal.await();
                                while (!stop.get())
                                {
                                    putNRemove(stop, cache);
                                    Thread.sleep(20);
                                }
                              }
                              catch (Exception e)
                              {
                                System.err.println("Failed :" + e.getMessage());
                              }
                              finally
                              {
                                endSignal.countDown();
                              }
                          }
                        };
                        for (int i = 1; i <= TOTAL_THREAD; i++)
                        {
                          new Thread(task, "PutNRemoveTest-" + i).start();
                        }
                        System.out.println("First we wait 10 seconds to have enough time to launch other JVM");
                        Thread.sleep(10000);
                        System.out.println("Launching the threads with the ID=" + ID);
                        startSignal.countDown();
                        System.out.println("Waiting until we reach " + MAX_ITERATIONS + " iterations");
                        endSignal.await();
                        System.exit(0);
                    }
                  }
                  
                  

                   

                  You have 10 seconds to launch your JVM, then it will launch 20 threads that will do 20 K iterations in total, for each iteration it will put an entry and remove its "parent" then it will remove the entry and its "parent".

                   

                  With ISPN 5.2.7.Final, you should get error of type "org.infinispan.util.concurrent.TimeoutException: Unable to acquire lock after [10 seconds] on key [$PARENT-ID] for requestor [GlobalTransaction:<andromede-33120>:1717:local]! Lock held by [GlobalTransaction:<andromede-33120>:1714:local]"

                   

                  Note 1: you will need arjuna (aka jbossts) 4.6.1-GA to run this test.

                  Note 2: this test works with ISPN 5.3 and 6.0 as already mentioned before

                  • 6. Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
                    Tristan Tarrant Master

                    Nicolas Filotto wrote:

                     

                    I've just checked and indeed I confirm that it works perfectly without any workaround on 5.3 and 6.0. Unfortunately I have to use the version shipped into EAP 6.3 which is ISPN 5.2.x :-(

                    Nicolas,

                    are you aware that the "Infinispan" you get in EAP is not covered by support when used directly in user applications ? I.e. it is an implementation detail of the internal server clustering and JPA 2-level cache. You should really use the modules we distribute (which go in their own slot) or embed the version you want in your deployment.

                     

                    Tristan

                    • 7. Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
                      Nicolas Filotto Novice

                      Hi Tristan,

                       

                      Thank you for the info, but actually this is indirectly needed for http://www.jboss.org/gatein which is a supported project so we need to use the versions of the third party librairies shipped into EAP.

                       

                      Nicolas

                      • 8. Re: Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
                        Pedro Ruivo Novice

                        Hi Nicolas,

                         

                        What is your thread_pool and oob_thread_pool configuration in udp.xml?

                         

                        Cheers,

                        Pedro

                        • 9. Re: Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
                          Nicolas Filotto Novice

                          Good point! Increasing the pool size allowed me to avoid the TimeoutException without any workaround.

                           

                          Thank you Pedro, this solved my issue, it is strange that we don't face the issue with the default pool configuration with ISPN 5.3 and 6.0 but it is another story

                          • 10. Re: Re: What is the best way to concurrently invalidate an entry in a replicated transactional cache?
                            Pedro Ruivo Novice

                            Hi Nicolas,

                             

                            actually is not strange. In recent versions, we are moving the remote command to a infinispan thread pool. This way, we avoid the jgroups thread pools to be exhausted. When it happens, the node cannot process any remote command neither the receive the replies.

                             

                            Cheers,

                            Pedro

                            1 of 1 people found this helpful