Is NotifyingFuture supposed to work with DefaultExecutorService?
defunkt May 24, 2013 9:45 AMHello,
I've been trying to use NotifyingFuture.attachListener() along with the distributed execution framework, however, whenever I attach a listener this way to a future returned from DistributedExecutorService.submit(Address, (Distributed)Callable), the future.get() call in futureDone() hangs and I receive a java.util.concurrent.TimeoutException after 15 seconds of waiting.
Actually, I've now run this same test about 25 times and one time the future.get() call did return, however only once, and this behaviour did not repeat otherwise.
I've tried using both the direct blocking future.get() and the listener. Directly calling future.get() works, using the listener doesn't (except for seemingly random time).
Attached are the source code for the test I ran (NotifyingFutureTest), the infinispan config file and jgroups config file. The callable was run on the executing node, and only one node was up in the cluster. This was run on infinispan 5.2.1.final with no additional modules (just the core relevant jars).
Example outputs:
Using listener:
2013-05-24 16:37:00,473 INFO [JGroupsTransport] (main) ISPN000078: Starting JGroups Channel
2013-05-24 16:37:06,761 INFO [JGroupsTransport] (main) ISPN000094: Received new cluster view: [node1-14501|0] [node1-14501]
2013-05-24 16:37:06,762 INFO [JGroupsTransport] (main) ISPN000079: Cache local address is node1-14501, physical addresses are [X]
LISTENER: submitting task to address node1-14501
in callable.call
LISTENER: getting future...
java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.get(DefaultExecutorService.java:913)
at test.exec.NotifyingFutureTest$1.futureDone(NotifyingFutureTest.java:56)
at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.notifyDone(DefaultExecutorService.java:1015)
at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart$2.doLocalInvoke(DefaultExecutorService.java:1106)
at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart$2.call(DefaultExecutorService.java:1086)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source)
at java.util.concurrent.FutureTask.get(Unknown Source)
at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.innerGet(DefaultExecutorService.java:948)
at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.get(DefaultExecutorService.java:911)
... 9 more
LISTENER: time in futureDone: 15004ms
Using simple future.get():
2013-05-24 16:44:32,395 INFO [JGroupsTransport] (main) ISPN000078: Starting JGroups Channel
2013-05-24 16:44:39,249 INFO [JGroupsTransport] (main) ISPN000094: Received new cluster view: [node1-22452|0] [node1-22452]
2013-05-24 16:44:39,250 INFO [JGroupsTransport] (main) ISPN000079: Cache local address is node1-22452, physical addresses are [X]
SIMPLE: submitting task to address node1-22452
SIMPLE: getting future result in simpleSend
in callable.call
SIMPLE: hello world
SIMPLE: after getting future in simpleSend
SIMPLE: time waiting for future: 139ms
Thanks!
-
NotifyingFutureTest.java.zip 1.0 KB
-
jgroups-udp.xml.zip 1.6 KB
-
infinispan-config.xml 1.9 KB