-
1. Re: remote cache entrySet stream throws unsupported exception
william.burns Jun 6, 2017 12:00 PM (in response to sea_shankar)Unfortunately streams are only supported in this fashion in embedded caches. Remote cache has to use the older filter/converter methods and a local iteration to do the same thing.
Here is an example of how you could do that.
try (CloseableIterator<Entry<Object, Object>> iter = cache.retrieveEntries(null, 5000)) { StreamSupport.stream(Spliterators.spliterator(iter, Long.MAX_VALUE, Spliterator.CONCURRENT), false).filter(e -> e.toString().contains("hello")).forEach(e -> log.info(e.toString())); }
Keep in mind though that this requires pulling every entry to the client first before it can be processed (thus you would have to call remove for every entry remotely). Entries will be pulled in chunks of 5000 as dictated by the argument to retrieveEntries so you should be able to avoid having memory issues.
You can also register a filter converter on the server before hand which can improve your performance when doing filtering such as this. Check out the Infinispan 9.0 User Guide section. In this case you don't need to apply the filter to the stream as it will be done automatically for you with the iterator on the server side, reducing the number of entries the client will have to pull down thus further improving performance.
If you want extreme performance however you can register a server side script or task and then invoke it on the client which performs the removals directly on the server so you don't have to have all of the hops between the client and the server
-
2. Re: remote cache entrySet stream throws unsupported exception
sea_shankar Jun 6, 2017 11:52 AM (in response to william.burns)Ahh that's unfortunate, thanks for the quick reply. Will give this a shot!
-
3. Re: remote cache entrySet stream throws unsupported exception
sea_shankar Jun 6, 2017 12:34 PM (in response to sea_shankar)Btw, why do they allow you to do .entrySet() in the first place since it's not allowed?
-
4. Re: remote cache entrySet stream throws unsupported exception
william.burns Jun 6, 2017 12:55 PM (in response to sea_shankar)This was what many of us would call a mistake that was made when designing the original API. By extending the Map interface we have to provide that method.
I actually talked a bit with nadirx and I created [ISPN-7900] Provide entrySet, values, keySet implementation for RemoteCache - JBoss Issue Tracker . You should be able to look out for that which would support your original code, however it won't be as optimal as the others I mentioned.
-
5. Re: remote cache entrySet stream throws unsupported exception
sea_shankar Jun 6, 2017 1:15 PM (in response to william.burns)For method 2 where you register a filter converter on the server, the removal will still have to do a remote call for each entry correct?
Can you tell me which section in the user guide it talks about the server side script? When I click the link it goes to the section I was already viewing (not the section I think you wanted me to look at).
I have a use case where I have ([K1, K2], V1), ([K1,K3], V2), etc where I need to remove all the entries with K1. Is this possible with a server side script? Will I be able to pass in K1 is a parameter when I invoke in the client for the script? If there is an existing example that would be great!
-
6. Re: remote cache entrySet stream throws unsupported exception
william.burns Jun 6, 2017 1:37 PM (in response to sea_shankar)Shankar Rajah wrote:
For method 2 where you register a filter converter on the server, the removal will still have to do a remote call for each entry correct?
Yes, it only reduces how many entries are returned to the client itself.
Shankar Rajah wrote:
Can you tell me which section in the user guide it talks about the server side script? When I click the link it goes to the section I was already viewing (not the section I think you wanted me to look at).
I have a use case where I have ([K1, K2], V1), ([K1,K3], V2), etc where I need to remove all the entries with K1. Is this possible with a server side script? Will I be able to pass in K1 is a parameter when I invoke in the client for the script? If there is an existing example that would be great!
There is a very simple example of how to register a script that does multiplication.
Remote Scripting Tutorial - Infinispan
You can change the text to be your code to check the cache. The url I posted earlier about scripting should work fine. If not you can just go section 19.10 manually. It has information of what you can and can't do in scripts.
-
7. Re: remote cache entrySet stream throws unsupported exception
sea_shankar Jun 6, 2017 1:53 PM (in response to william.burns)Also with regards to running a task, looked at the link and it has the following:
Cache<String, String> cache = (Cache<String, String>) taskContext.getCache().get();
return cache.entrySet().stream().map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s+"))...
This is using Cache instead of RemoteCache? Would I replace it with RemoteCache?
Do I need to register the Task with the Server? (How is this done?)
If there is documentation on Server Task in the User Guide (sorry did a search and couldn't find), could you point me to it? In your opinion, is it preferred to do Tasks or Scripting?
-
8. Re: remote cache entrySet stream throws unsupported exception
william.burns Jun 6, 2017 4:45 PM (in response to sea_shankar)Unfortunately ServerTask doesn't have proper documentation atm. My best suggestion would be to look at some Integration Tests we have like the one I linked to. Here is the one that invokes the given task I linked: infinispan/LocalServerTaskIT.java at master · infinispan/infinispan · GitHub
Note these are registered as service providers. To be honest nadirx knows more about how this works
A ServerTask is executed in the server, thus you use the normal Cache object.
-
9. Re: remote cache entrySet stream throws unsupported exception
sea_shankar Jun 6, 2017 5:45 PM (in response to william.burns)Awesome, thanks for your responses and patience!
Hopefully nadirx can chime in. I have created a task to do the filtering and removing. I am just a bit confused as to how to deploy it. Have to create a jar and then put it under standalone/deployments? I am actually using domain.sh, so not sure if I still have to put the jar there?
-
10. Re: remote cache entrySet stream throws unsupported exception
nadirx Jun 7, 2017 3:54 AM (in response to sea_shankar)If you are using domain mode, you need to use the CLI as described here:
Application deployment - WildFly 10 - Project Documentation Editor
-
11. Re: remote cache entrySet stream throws unsupported exception
sea_shankar Jun 7, 2017 12:28 PM (in response to nadirx)I am trying to do the scripting:
"// mode=local,language=javascript\n"
+ "cacheName*partialKey\n"
+ "var cache = cacheManager.getCache(cacheName);\n"
//+ "cache.remove(partialKey);";
+ "cache.entrySet().stream().filter(function(e){ return e.getKey().toString().contains(partialKey); });";
where the key is a spring SimpleKey (which is serializable)
But I get the following error:
Error received from the server: java.util.concurrent.ExecutionException: org.infinispan.commons.CacheException: org.infinispan.commons.CacheException: java.io.NotSerializableException: org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1
org.infinispan.commons.CacheException: org.infinispan.commons.CacheException: java.io.NotSerializableException: org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1
org.infinispan.commons.CacheException: java.io.NotSerializableException: org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1
java.io.NotSerializableException: org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1
an exception which occurred:
in object org.infinispan.interceptors.distribution.DistributionBulkInterceptor$BackingEntrySet$1@5499e15b
Any idea what the issue can be?
-
12. Re: remote cache entrySet stream throws unsupported exception
william.burns Jun 7, 2017 1:12 PM (in response to sea_shankar)Unfortunately you have run into [ISPN-6173] Provide DynaLink-based solution for serializing Nashorn lambdas - JBoss Issue Tracker
It seems you are passing an Entry as the argument, but that isn't the big issue. Unfortunately scripts have no way to make an object Serializable as they will contain script objects internally in the compiled bytecode. If you want to use a distributed stream I would recommend using a ServerTask, unless you know some nashorn magic. Everything should work fine with ServerTask.
-
13. Re: remote cache entrySet stream throws unsupported exception
sea_shankar Jun 7, 2017 1:31 PM (in response to william.burns)Ahh I see, yes definitely want the stream capabilities, will continue with the ServerTask method then. Another minor question:
I have this task:
public Object call() throws Exception {
Cache<Object, Object> cache = (Cache<Object, Object>) taskContext.getCache().get();
cache.entrySet().stream().filter(e -> e.getKey().toString().contains("sessionid")).forEach(e -> cache.remove(e.getKey()));
return null;
}
Cache<Object, Object> cache = (Cache<Object, Object>) taskContext.getCache().get();
Which cache does this get? For example, if I have Cache1 and Cache2, and I want to only remove the entries from Cache2, should the above call be getting only Cache2? Didn't find a call taskContext.getCache(cacheName).
-
14. Re: remote cache entrySet stream throws unsupported exception
sea_shankar Jun 7, 2017 2:33 PM (in response to william.burns)I was able to get a task deployed on the server, but still getting the serialization error:
[Server:server-two] 14:31:16,743 ERROR [org.infinispan.remoting.rpc.RpcManagerImpl] (HotRod-ServerHandler-6-2) ISPN000073: Unexpected error while replicating: org.infinispan.commons.marshall.NotSerializableException: org.jboss.as.clustering.infinispan.DefaultCacheContainer$DelegatingCache
[Server:server-two] Caused by: an exception which occurred:
[Server:server-two] in field capturedArgs
[Server:server-two] in object com.pcf.cardinal.cache.config.ClearSessionTask$$Lambda$427/872895107@469b480a