How to properly use Infinispan's distributed execution support?
yanduc Nov 3, 2014 5:18 PMHi all,
given the docs and what I could find online, I could not come up with a better solution than the one below, which does not work...
Here's what I need to do: I want do distribute tasks over a data grid (composed of Infinispan distributed cache nodes) and have these tasks executed "local" to the cached data at each node in the grid. I do not want to do map-reduce. I simply want to do simple aggregation: each distributed task instance computes primes numbers (out of numbers kept in cache), and returns that set of prime numbers to the caller, which aggregates all these sets into a single final set.
In order to perform that little experiment, I've implemented a DistributedCallable class, which I dispatch using the DistributedExecutorService. Note in the code below that upon execution, the task acquires the values from the cache (expecting them to correspond to the values at that node, as mentioned in the javadoc).
public class ComputePrimeTask implements DistributedCallable<String, Integer, Set<Integer>>, Serializable {
static final long serialVersionUID = 1L;
private transient Cache<String, Integer> cache;
private transient Set<String> keys;
public void setEnvironment(Cache<String, Integer> cache, Set<String> keys) {
this.cache = cache;
this.keys = keys;
}
public Set<Integer> call() throws Exception {
System.out.println("Executing ComputePrimeTask - number of elements in cache: " + cache.size());
Set<Integer> toReturn = new HashSet<Integer>();
for (Integer v : cache.values()) {
if (isPrime(v)) {
toReturn.add(v);
}
}
System.out.println(String.format("Returning %s prime numbers", toReturn.size()));
return toReturn;
}
private boolean isPrime(int n) {
if (n%2 == 0) {
return false;
}
for(int i = 3; i * i <=n; i += 2) {
if (n%i == 0) {
return false;
}
}
return true;
}
}
I'm getting the following exception, which hints at a marshalling error when calling cache.values():
java.lang.ClassCastException: [B cannot be cast to java.lang.Integer
at com.experiment.grid.infinispan.ComputePrimeTask.call(IsComputePrimeTask.java:27)
at com.experiment.grid.infinispan.ComputePrimeTask.call(IsComputePrimeTask.java:1)
at org.infinispan.commands.read.DistributedExecuteCommand.perform(DistributedExecuteCommand.java:99)
at org.infinispan.remoting.InboundInvocationHandlerImpl.handleInternal(InboundInvocationHandlerImpl.java:95)
at org.infinispan.remoting.InboundInvocationHandlerImpl.access$000(InboundInvocationHandlerImpl.java:50)
at org.infinispan.remoting.InboundInvocationHandlerImpl$2.run(InboundInvocationHandlerImpl.java:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
I'm really not sure, with the information out there, how I should proceed:
- Given the stacktrace I'm seeing, I don't think I'm handling my use-case properly.
- Also, all the methods given access to the "local" content of the cache (such as values(), keySet(), entrySet(), etc.) have the following note in their javadoc - Cache (Infinispan Distribution 6.0.2.Final API): " This method should only be used for testing or debugging purposes such as to verify that the cache contains all the values entered. Any other use involving execution of this method on a production system is not recommended. ". At the same time, I see no other way of accessing cache content to get the "local" data...
- In another post that I've seen but can't find a reference to anymore, it was mentioned that I could iterate through they keys using keySet(), and then use the CacheManager to determine if it actually owned that key, prior to use it "locally". I was a bit worried by this since that would imply the cache internally pulling keys from the other nodes in order to determine if it owns given keys...Given my troubles, I preferred to overlook that last post, since it did not make sense to me anyway.
I'd like to be provided with the right approach, if such an approach exists for my use-case (which is pretty standard and common, for such a piece of infra, and convinces me that it can be done).
Thanks!