1 2 3 Previous Next 42 Replies Latest reply on Jul 6, 2017 4:30 AM by galder.zamarreno

    remote cache entrySet stream throws unsupported exception

    sea_shankar

      I am trying to use the entrySet().stream in order to do some filtering and removing following this StackOverflow thread:

      caching - Best way to remove cache entry based on predicate in infinispan? - Stack Overflow

       

      I have a RemoteCache<Object,Object> cache = cacheManager.get("cache");

      cache.put("hello","hello")

       

      Tried the following:

       

      Works:

      cache.keySet().stream().filter(e -> e.toString().contains("hello")).forEach(e -> log.info(e.toString()));

       

      Unsupported Exception:

      cache.entrySet().stream().filter(e -> e.toString().contains("hello")).forEach(e -> log.info(e.toString()));

       

      Get this exception: java.lang.UnsupportedOperationException: null

       

      Any idea?  The stack trace isn't very useful at all. 

        • 1. Re: remote cache entrySet stream throws unsupported exception
          william.burns

          Unfortunately streams are only supported in this fashion in embedded caches. Remote cache has to use the older filter/converter methods and a local iteration to do the same thing.

           

          Here is an example of how you could do that.

           

          try (CloseableIterator<Entry<Object, Object>> iter = cache.retrieveEntries(null, 5000)) {
             StreamSupport.stream(Spliterators.spliterator(iter, Long.MAX_VALUE, Spliterator.CONCURRENT), false).filter(e -> e.toString().contains("hello")).forEach(e -> log.info(e.toString()));
          }
          

           

          Keep in mind though that this requires pulling every entry to the client first before it can be processed (thus you would have to call remove for every entry remotely). Entries will be pulled in chunks of 5000 as dictated by the argument to retrieveEntries so you should be able to avoid having memory issues.

           

          You can also register a filter converter on the server before hand which can improve your performance when doing filtering such as this. Check out the Infinispan 9.0 User Guide section. In this case you don't need to apply the filter to the stream as it will be done automatically for you with the iterator on the server side, reducing the number of entries the client will have to pull down thus further improving performance.

           

          If you want extreme performance however you can register a server side script or task and then invoke it on the client which performs the removals directly on the server so you don't have to have all of the hops between the client and the server

          • 2. Re: remote cache entrySet stream throws unsupported exception
            sea_shankar

            Ahh that's unfortunate, thanks for the quick reply.  Will give this a shot!

            • 3. Re: remote cache entrySet stream throws unsupported exception
              sea_shankar

              Btw, why do they allow you to do .entrySet() in the first place since it's not allowed?

              • 4. Re: remote cache entrySet stream throws unsupported exception
                william.burns

                This was what many of us would call a mistake that was made when designing the original API. By extending the Map interface we have to provide that method.

                 

                I actually talked a bit with nadirx and I created [ISPN-7900] Provide entrySet, values, keySet implementation for RemoteCache - JBoss Issue Tracker . You should be able to look out for that which would support your original code, however it won't be as optimal as the others I mentioned.

                • 5. Re: remote cache entrySet stream throws unsupported exception
                  sea_shankar

                  For method 2 where you register a filter converter on the server, the removal will still have to do a remote call for each entry correct?

                   

                  Can you tell me which section in the user guide it talks about the server side script?  When I click the link it goes to the section I was already viewing (not the section I think you wanted me to look at).

                   

                  I have a use case where I have ([K1, K2], V1), ([K1,K3], V2), etc where I need to remove all the entries with K1. Is this possible with a server side script?  Will I be able to pass in K1 is a parameter when I invoke in the client for the script?  If there is an existing example that would be great!

                  • 6. Re: remote cache entrySet stream throws unsupported exception
                    william.burns

                    Shankar Rajah wrote:

                     

                    For method 2 where you register a filter converter on the server, the removal will still have to do a remote call for each entry correct?

                    Yes, it only reduces how many entries are returned to the client itself.

                     

                    Shankar Rajah wrote:

                     

                    Can you tell me which section in the user guide it talks about the server side script? When I click the link it goes to the section I was already viewing (not the section I think you wanted me to look at).

                     

                    I have a use case where I have ([K1, K2], V1), ([K1,K3], V2), etc where I need to remove all the entries with K1. Is this possible with a server side script? Will I be able to pass in K1 is a parameter when I invoke in the client for the script? If there is an existing example that would be great!

                    There is a very simple example of how to register a script that does multiplication.

                     

                    Remote Scripting Tutorial - Infinispan

                     

                    You can change the text to be your code to check the cache. The url I posted earlier about scripting should work fine. If not you can just go section 19.10 manually. It has information of what you can and can't do in scripts.

                    • 7. Re: remote cache entrySet stream throws unsupported exception
                      sea_shankar

                      Also with regards to running a task, looked at the link and it has the following:

                       

                      Cache<String, String> cache = (Cache<String, String>) taskContext.getCache().get();

                      return cache.entrySet().stream().map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s+"))...

                       

                      This is using Cache instead of RemoteCache?  Would I replace it with RemoteCache? 

                      Do I need to register the Task with the Server? (How is this done?) 

                       

                      If there is documentation on Server Task in the User Guide (sorry did a search and couldn't find), could you point me to it?  In your opinion, is it preferred to do Tasks or Scripting?

                       

                       

                      • 8. Re: remote cache entrySet stream throws unsupported exception
                        william.burns

                        Unfortunately ServerTask doesn't have proper documentation atm. My best suggestion would be to look at some Integration Tests we have like the one I linked to. Here is the one that invokes the given task I linked: infinispan/LocalServerTaskIT.java at master · infinispan/infinispan · GitHub

                         

                        Note these are registered as service providers. To be honest nadirx knows more about how this works

                         

                        A ServerTask is executed in the server, thus you use the normal Cache object.

                        • 9. Re: remote cache entrySet stream throws unsupported exception
                          sea_shankar

                          Awesome, thanks for your responses and patience! 

                           

                          Hopefully  nadirx  can chime in.  I have created a task to do the filtering and removing.  I am just a bit confused as to how to deploy it.  Have to create a jar and then put it under  standalone/deployments?  I am actually using domain.sh, so not sure if I still have to put the jar there?

                          • 10. Re: remote cache entrySet stream throws unsupported exception
                            nadirx

                            If you are using domain mode, you need to use the CLI as described here:

                             

                            Application deployment - WildFly 10 - Project Documentation Editor

                            • 11. Re: remote cache entrySet stream throws unsupported exception
                              sea_shankar

                              nadirx

                               

                              I am trying to do the scripting:

                               

                              "// mode=local,language=javascript\n"

                              + "cacheName*partialKey\n"

                              + "var cache = cacheManager.getCache(cacheName);\n"

                              //+ "cache.remove(partialKey);";

                              + "cache.entrySet().stream().filter(function(e){ return e.getKey().toString().contains(partialKey); });";

                               

                              where the key is a spring SimpleKey (which is serializable)

                               

                              But I get the following error:

                              Error received from the server: java.util.concurrent.ExecutionException: org.infinispan.commons.CacheException: org.infinispan.commons.CacheException: java.io.NotSerializableException: org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1

                              org.infinispan.commons.CacheException: org.infinispan.commons.CacheException: java.io.NotSerializableException: org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1

                              org.infinispan.commons.CacheException: java.io.NotSerializableException: org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1

                              java.io.NotSerializableException: org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1

                              an exception which occurred:

                                in object org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1@5499e15b

                               

                              Any idea what the issue can be?

                              • 12. Re: remote cache entrySet stream throws unsupported exception
                                william.burns

                                Unfortunately you have run into [ISPN-6173] Provide DynaLink-based solution for serializing Nashorn lambdas - JBoss Issue Tracker

                                 

                                It seems you are passing an Entry as the argument, but that isn't the big issue. Unfortunately scripts have no way to make an object Serializable as they will contain script objects internally in the compiled bytecode. If you want to use a distributed stream I would recommend using a ServerTask, unless you know some nashorn magic. Everything should work fine with ServerTask.

                                • 13. Re: remote cache entrySet stream throws unsupported exception
                                  sea_shankar

                                  Ahh I see, yes definitely want the stream capabilities, will continue with the ServerTask method then.  Another minor question:

                                   

                                  I have this task:

                                  public Object call() throws Exception {

                                          Cache<Object, Object> cache = (Cache<Object, Object>) taskContext.getCache().get();

                                          cache.entrySet().stream().filter(e -> e.getKey().toString().contains("sessionid")).forEach(e -> cache.remove(e.getKey()));

                                          return null;

                                  }

                                   

                                  Cache<Object, Object> cache = (Cache<Object, Object>) taskContext.getCache().get();

                                   

                                  Which cache does this get?  For example, if I have Cache1 and Cache2, and I want to only remove the entries from Cache2, should the above call be getting only Cache2?  Didn't find a call taskContext.getCache(cacheName). 

                                  • 14. Re: remote cache entrySet stream throws unsupported exception
                                    sea_shankar

                                    nadirx

                                     

                                    I was able to get a task deployed on the server, but still getting the serialization error:

                                     

                                    [Server:server-two] 14:31:16,743 ERROR [org.infinispan.remoting.rpc.RpcManagerImpl] (HotRod-ServerHandler-6-2) ISPN000073: Unexpected error while replicating: org.infinispan.commons.marshall.NotSerializableException: org.jboss.as.clustering.infinispan.DefaultCacheContainer$DelegatingCache

                                    [Server:server-two] Caused by: an exception which occurred:

                                    [Server:server-two]   in field capturedArgs

                                    [Server:server-two]   in object com.pcf.cardinal.cache.config.ClearSessionTask$$Lambda$427/872895107@469b480a

                                    1 2 3 Previous Next