3 Replies Latest reply on May 30, 2013 6:27 PM by Vladimir Blagojevic

    Is NotifyingFuture supposed to work with DefaultExecutorService?

    defunkt Newbie



      I've been trying to use NotifyingFuture.attachListener() along with the distributed execution framework, however, whenever I attach a listener this way to a future returned from DistributedExecutorService.submit(Address, (Distributed)Callable), the future.get() call in futureDone() hangs and I receive a java.util.concurrent.TimeoutException after 15 seconds of waiting.


      Actually, I've now run this same test about 25 times and one time the future.get() call did return, however only once, and this behaviour did not repeat otherwise.


      I've tried using both the direct blocking future.get() and the listener. Directly calling future.get() works, using the listener doesn't (except for seemingly random time).

      Attached are the source code for the test I ran (NotifyingFutureTest), the infinispan config file and jgroups config file. The callable was run on the executing node, and only one node was up in the cluster. This was run on infinispan 5.2.1.final with no additional modules (just the core relevant jars).


      Example outputs:

      Using listener:


      2013-05-24 16:37:00,473 INFO  [JGroupsTransport] (main) ISPN000078: Starting JGroups Channel

      2013-05-24 16:37:06,761 INFO  [JGroupsTransport] (main) ISPN000094: Received new cluster view: [node1-14501|0] [node1-14501]

      2013-05-24 16:37:06,762 INFO  [JGroupsTransport] (main) ISPN000079: Cache local address is node1-14501, physical addresses are [X]

      LISTENER: submitting task to address node1-14501

      in callable.call

      LISTENER: getting future...

      java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException



                at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.get(DefaultExecutorService.java:913)

                at test.exec.NotifyingFutureTest$1.futureDone(NotifyingFutureTest.java:56)

                at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.notifyDone(DefaultExecutorService.java:1015)

                at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart$2.doLocalInvoke(DefaultExecutorService.java:1106)

                at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart$2.call(DefaultExecutorService.java:1086)

                at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)

                at java.util.concurrent.FutureTask.run(Unknown Source)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

                at java.lang.Thread.run(Unknown Source)

      Caused by: java.util.concurrent.TimeoutException

                at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source)

                at java.util.concurrent.FutureTask.get(Unknown Source)

                at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.innerGet(DefaultExecutorService.java:948)

                at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.get(DefaultExecutorService.java:911)

                ... 9 more

      LISTENER: time in futureDone: 15004ms



      Using simple future.get():


      2013-05-24 16:44:32,395 INFO  [JGroupsTransport] (main) ISPN000078: Starting JGroups Channel

      2013-05-24 16:44:39,249 INFO  [JGroupsTransport] (main) ISPN000094: Received new cluster view: [node1-22452|0] [node1-22452]

      2013-05-24 16:44:39,250 INFO  [JGroupsTransport] (main) ISPN000079: Cache local address is node1-22452, physical addresses are [X]

      SIMPLE: submitting task to address node1-22452

      SIMPLE: getting future result in simpleSend

      in callable.call

      SIMPLE: hello world

      SIMPLE: after getting future in simpleSend

      SIMPLE: time waiting for future: 139ms