8 Replies Latest reply on Oct 6, 2016 8:11 AM by vdzhuvinov

    clear invalidation cache cluster wide

    bill.burke

      I've got a clustered invalidation cache.  How do I clear every cache instance on every node?  I tried cache.clear() but it only clears the current node.  Before a clear in the example below, Node1 has "key" inserted into it.  Node2 has "key2" inserted into it.  If you call node1.clear() "key2" still exists in node1.  Again this is an invalidation cache.

       

      public class ClusteredCacheBehaviorTest {

         public EmbeddedCacheManager createManager() {

        System.setProperty("java.net.preferIPv4Stack", "true");

        System.setProperty("jgroups.tcp.port", "53715");

        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();

       

         boolean clustered = true;

         boolean async = false;

         boolean allowDuplicateJMXDomains = true;

       

         if (clustered) {

        gcb = gcb.clusteredDefault();

        gcb.transport().clusterName("test-clustering");

        }

        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);

       

        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());

       

       

        ConfigurationBuilder invalidationConfigBuilder = new ConfigurationBuilder();

         if (clustered) {

        invalidationConfigBuilder.clustering().cacheMode(async ? CacheMode.INVALIDATION_ASYNC : CacheMode.INVALIDATION_SYNC);

        }

        Configuration invalidationCacheConfiguration = invalidationConfigBuilder.build();

       

        cacheManager.defineConfiguration(InfinispanConnectionProvider.REALM_CACHE_NAME, invalidationCacheConfiguration);

         return cacheManager;

       

        }

       

         @Listener
         public static class CacheListener {

        String name;

       

         public CacheListener(String name) {

         this.name = name;

        }

       

       

         @CacheEntryCreated
         public void created(CacheEntryCreatedEvent event) {

       

        System.out.println("Listener '" + name + "' entry created " + event.getKey() + " isPre: " + event.isPre());

        }

       

         @CacheEntryRemoved
         public void removed(CacheEntryRemovedEvent<String, Object> event) {

        System.out.println("Listener '" + name + "' entry removed isPre: " + event.isPre());

        }

       

         @CacheEntryInvalidated
         public void removed(CacheEntryInvalidatedEvent<String, Object> event) {

        System.out.println("Listener '" + name + "' entry invalidated: isPre: " + event.isPre());

        }

       

         @CacheEntriesEvicted
         public void evicted(CacheEntriesEvictedEvent<String, Object> event) {

        System.out.println("Listener '" + name + "' entry evicted isPre: " + event.isPre());

       

        }

       

        }

       

         @Test
         public void testListener() throws Exception {

        EmbeddedCacheManager node1 = createManager();

        EmbeddedCacheManager node2 = createManager();

        Cache<String, Object> node1Cache = node1.getCache(InfinispanConnectionProvider.REALM_CACHE_NAME);

        node1Cache.addListener(new CacheListener("node1"));

        Cache<String, Object> node2Cache = node2.getCache(InfinispanConnectionProvider.REALM_CACHE_NAME);

        node2Cache.addListener(new CacheListener("node2"));

       

        System.out.println("node1 create entry");

        node1Cache.put("key", "node1");

       

        System.out.println("node1 create entry");

        node1Cache.put("key", "node111");

       

        System.out.println("node2 create entry");

        node2Cache.put("key", "node2");

       

        System.out.println("node1 remove entry");

        node1Cache.remove("key");

       

        System.out.println("node2 remove entry");

        node2Cache.remove("key");

       

        System.out.println("node2 put entry");

        node2Cache.put("key", "node2");

        System.out.println("node2 evict entry");

        node2Cache.evict("key");

        System.out.println("node1/node2 putExternal entry");

        node1Cache.putForExternalRead("key", "common");

        node2Cache.putForExternalRead("key", "common");

        System.out.println("node2 remove entry");

        node2Cache.remove("key");

        System.out.println("node1 remove entry");

        node1Cache.remove("key");

       

         // test remove non-existing node 2, existing node 1
         System.out.println("Test non existent remove");

        System.out.println("node1 create entry");

        node1Cache.put("key", "value");

        System.out.println("node2 remove non-existent entry");

        System.out.println("exists?: " + node2Cache.containsKey("key"));

        node2Cache.remove("key");

       

         // test clear
         System.out.println("Test clear cache");

        System.out.println("add key to node 1, key2 to node2");

        node1Cache.putForExternalRead("key", "value");

        node2Cache.putForExternalRead("key", "value");

        node2Cache.putForExternalRead("key2", "value");

        System.out.println("Clear from node1");

        node1Cache.clear();

        System.out.println("node 2 exists key2?: " + node2Cache.containsKey("key2"));

        System.out.println("node 2 exists key?: " + node2Cache.containsKey("key"));

       

       

       

        }

      }

        • 1. Re: clear invalidation cache cluster wide
          vdzhuvinov

          Try cache.stream() which should operate cluster wide and also include persisted entries (if you have any).

          • 2. Re: clear invalidation cache cluster wide
            bill.burke

            There is no cache.stream() method in Infinispan 8.1.0.  Are you talking about cache.entrySet().stream()?  Please explain how that would work for an invalidation cache if Node 1 and Node 2 do not have the same keys. 

            • 3. Re: clear invalidation cache cluster wide
              vdzhuvinov

              You're right Bill, I meant Cache.entrySet().stream(), or alternatively, Cache.keySet().stream(). But this may only work if you have a cache store configured for the cache. We recently started experimenting with a setup using invalidation, and noticed that keySet() skips evicted entries that have been persisted to a DB. With replication mode keySet() does not skip them.

               

              One thing you could try is to pass a message to each node, to trigger clear() locally. You could do this for example by declaring a special replicated cache with a listener, and use that as the messaging mechanism.

              • 4. Re: clear invalidation cache cluster wide
                rvansa

                Invalidation mode replicates the ClearCommand, the test works in recent version and there were little changes in this logic. Can you reproduce that on Infinispan 8.2.4.Final? Any chance you set async=true? (in that case it really could fail).

                 

                Generally about "messaging mechansim", rather than abusing listeners, use Distributed Execution Framework.

                • 5. Re: clear invalidation cache cluster wide
                  vdzhuvinov

                  Thanks for the Dist Exec framework pointer, that's good to know!

                   

                  What is the reason for cache.keySet() behaving differently in modes local / invalidation vs modes replicated / distributed?

                   

                  (in regard to including evicted entries that have been persisted)

                  • 6. Re: clear invalidation cache cluster wide
                    william.burns

                    That is how it works Vladimir, check out [1].  Basically local and invalidation only use local contents.  Invalidation is only used to remove entries when they are changed (there are inconsistent values between nodes).  I will be updating this section to talk of this in more detail.

                     

                    [1] Infinispan 9.0 User Guide

                    • 7. Re: clear invalidation cache cluster wide
                      rvansa

                      The reason is that conceptually invalidation cache does not hold all the data; therefore it does not make sense to go to remote nodes and ask if it has any other entries - you still don't have the full set of data, that is only in the persistent storage. Moreover, we would have to deal with the fact that some entries are both on local mode and remote one (they don't have well-defined owners) but each entry should be used only once.

                      • 8. Re: clear invalidation cache cluster wide
                        vdzhuvinov

                        Thanks guys, this makes perfect sense