1 2 Previous Next 15 Replies Latest reply on Feb 15, 2017 12:12 PM by william.burns

    DistributedExecutionService failover is not working.

    seto

      A very simple test.

      It just fail. But no attempts.

       

       

      DefaultExecutorService des = new DefaultExecutorService(idCache);
      des.submit(des.createDistributedTaskBuilder(new TestCallable()).failoverPolicy(new SameNodeTaskFailoverPolicy(10)).build());
      

       

      private static class TestCallable implements Callable<Void>, Serializable {
      
         @Override
         public Void call() throws Exception {
        System.out.println("mama");
        throw new Exception("kaka");
         }
      }
      

       

      private static class SameNodeTaskFailoverPolicy implements DistributedTaskFailoverPolicy {
         private int maxFailoverAttempts;
      
        public SameNodeTaskFailoverPolicy(int maxFailoverAttempts) {
         this.maxFailoverAttempts = maxFailoverAttempts;
         }
      
         @Override
         public Address failover(FailoverContext fc) {
        System.out.println("failover");
        return fc.executionFailureLocation();
         }
      
         @Override
         public int maxFailoverAttempts() {
         return maxFailoverAttempts;
         }
      }
      
        • 1. Re: DistributedExecutionService failover is not working.
          seto

          But it seems that there's two problems.

          1. UPDATED: I found it's the problem of submit. Even though the input is not specified. It should use the same logic with input keys specified. Because DistributedTask is not only related to specified keys affinity selecting. But also there's stuff like execution policy, failover policy.

          @Override
          public <T, K> CompletableFuture<T> submit(DistributedTask<T> task, K... input) {
             if (task == null) throw new NullPointerException();
          
            if(inputKeysSpecified(input)){
            Map<Address, List<K>> nodesKeysMap = keysToExecutionNodes(task.getTaskExecutionPolicy(), input);
             checkExecutionPolicy(task, nodesKeysMap, input);
             Address me = getAddress();
             DistributedExecuteCommand<T> c = factory.buildDistributedExecuteCommand(task.getCallable(), me, Arrays.asList(input));
             ArrayList<Address> nodes = new ArrayList<Address>(nodesKeysMap.keySet());
             DistributedTaskPart<T> part = createDistributedTaskPart(task, c, Arrays.asList(input), selectExecutionNode(nodes), 0);
             part.execute();
            return part;
             } else {
             return submit(task.getCallable());
             }
          }
          

          2.UPDATED:Future.get() must be called. I think it should be triggered even without get(). Because some tasks may be just desired to compute and modify data in the cache instead of getting a result. Anyway, there's a simple workaround by  attachFutureListener for a workaround.

          And also the workaround is only working with DistributedTask submitted to other nodes. It's not working with DistributedTask submitted to local node.

           

          GitHub - SetoKaiba/test1

          And here's a project for reproducing for all the situations.

          • Test00: submit with attachListener: NOT WORKING(Because of 1st problem and 2nd problem)
          • Test01: submit with get: NOT WORKING(Because of 1st and 2nd problem)
          • Test10: submit to local address with attachListener: NOT WORKING(Because of 2nd problem)
          • Test11: submit to local address with get: WORKING
          • Empty run 1st, Test20 run 2nd: submit to other address instead of local with attachListener: Empty WORKING
          • Empty run 1st, Test21 run 2nd: submit to other address instead of local with get: Empty WOKRING
          • Empty run 1st, Test30 run 2nd: submit everywhere instead of local with attachListener: Empty working and Test31 NOT WORKING(Because of 2nd problem)
          • Empty run 1st, Test31 run 2nd: submit everywhere instead of local with get: Empty WORKING and Test31 WORKING
          • 2. Re: DistributedExecutionService failover is not working.
            william.burns

            Thanks, it looks like both of these are different bugs as you found out.

             

            1. We should be utilizing failover when a DistributedTask is used despite the keys being passed, that is an oversight.
            2. Unfortunately the distributed executor was mostly written to satisfy the old Future methods and we kinda tacked on the new functional APIs, however the underlying implementation wasn't updated to support everything.

             

            If you could log a couple JIRAs for them, that would be great.  If you wanted to try to fix them and submit a PR that would be even better!

            • 3. Re: DistributedExecutionService failover is not working.
              seto

              Ok. I'll create issues in the JIRA.

              If I have time, I'll try to fix it and submit a PR.

               

              But it seems that the PR is waiting for long before it's verified.

              ISPN-7363 Fix Runnable inject for disexec. by SetoKaiba · Pull Request #4785 · infinispan/infinispan · GitHub

              This pull request is still not verified no matter it's with bug or not.

               

              And also, because I'm working with legacy project with Hibernate OGM. Infinispan 9 is not supported yet. So I have to use Infinispan 8.x.

              Will the fix apply to 8.x?

               

              What is new functional APIs referred to? Can it run a task? If you're referring to streaming API. It's not the same thing in my case.

              • 4. Re: DistributedExecutionService failover is not working.
                william.burns

                Seto Kaiba wrote:

                 

                Ok. I'll create issues in the JIRA.

                If I have time, I'll try to fix it and submit a PR.

                Great, thanks!

                 

                Seto Kaiba wrote:


                But it seems that the PR is waiting for long before it's verified.

                ISPN-7363 Fix Runnable inject for disexec. by SetoKaiba · Pull Request #4785 · infinispan/infinispan · GitHub

                This pull request is still not verified no matter it's with bug or not.

                I apologize that this has happened. Unfortunately, Sebastian is away for a bit on personal leave.  When he returns I am sure he would be glad to continue helping out.

                 

                Seto Kaiba wrote:

                 

                And also, because I'm working with legacy project with Hibernate OGM. Infinispan 9 is not supported yet. So I have to use Infinispan 8.x.

                Will the fix apply to 8.x?

                These changes should apply somewhat cleanly.  The only change we made to Distributed Executor in Infinispan 9 is to have CompletableFuture return types.

                 

                Seto Kaiba wrote:

                 

                What is new functional APIs referred to? Can it run a task? If you're referring to streaming API. It's not the same thing in my case.

                I am referring to the various methods on CompletableFuture and our old Listener implementations that were used with Distributed Executor.

                 

                To be honest I would love to know how you are using Distributed Executor.  To my knowledge ClusterExecutor and Distributed Streams should cover all use cases except some around failover, which we are more than willing to explore enhancing ClusterExecutor to provide those guarantees.

                • 5. Re: DistributedExecutionService failover is not working.
                  seto

                  I am referring to the various methods on CompletableFuture and our old Listener implementations that were used with Distributed Executor.

                   

                  To be honest I would love to know how you are using Distributed Executor. To my knowledge ClusterExecutor and Distributed Streams should cover all use cases except some around failover, which we are more than willing to explore enhancing ClusterExecutor to provide those guarantees.

                  I'm using just as the ClusterExecutor way.

                  To me, the problem is ClusterExecutor doesn't support failover, execution policy and timeout feature provided by DistributedTask.

                  And also ClusterExecutor doesn't support CDI. Maybe I can wrap the task inside to inject fields myself before it's really run.

                  Some logic may need the key affinity running as well. Enhancing ClusterExecutor would be great.

                  • 6. Re: DistributedExecutionService failover is not working.
                    seto

                    It's strange that the ClusterExecutor and DistributedExecutorService exist together when I first take a look at distributed execution.

                    But what I make I finally use DES is the problem list in the above post. Unify DES and CE to be one would be great.

                    • 7. Re: DistributedExecutionService failover is not working.
                      william.burns

                      Seto Kaiba wrote:

                       

                      I am referring to the various methods on CompletableFuture and our old Listener implementations that were used with Distributed Executor.

                       

                      To be honest I would love to know how you are using Distributed Executor. To my knowledge ClusterExecutor and Distributed Streams should cover all use cases except some around failover, which we are more than willing to explore enhancing ClusterExecutor to provide those guarantees.

                      I'm using just as the ClusterExecutor way.

                      To me, the problem is ClusterExecutor doesn't support failover, execution policy and timeout feature provided by DistributedTask.

                      And also ClusterExecutor doesn't support CDI. Maybe I can wrap the task inside to inject fields myself before it's really run.

                      Some logic may need the key affinity running as well. Enhancing ClusterExecutor would be great.

                      ClusterExecutor supports timeout with ClusterExecutor (Infinispan JavaDoc All 9.0.0.Beta2 API) .

                       

                      So it sounds like you want execution policy (topology awareness), failover and CDI?  These should be doable, unfortunately they would have to wait until ISPN 10 most likely

                       

                      What do you mean by about key affinity?  The new distributed streams should be preferred to be used with anything regarding keys or data instead of distributed execution.  Distributed streams always goes to the primary owner of a key and if this changes while in the middle of execution it will be rerouted automatically.  Unfortunately Distributed execution doesn't have that guarantee so there is a chance you may miss data if it moves while in the middle of an execution.

                      • 8. Re: DistributedExecutionService failover is not working.
                        william.burns

                        Seto Kaiba wrote:

                         

                        It's strange that the ClusterExecutor and DistributedExecutorService exist together when I first take a look at distributed execution.

                        But what I make I finally use DES is the problem list in the above post. Unify DES and CE to be one would be great.

                        They both exist due to being implemented at different time.  The DistributedExecutorService is from ISPN 5.  The ClusterExecutor (ISPN 8.2) is meant to be a lighter weight implementation not tied to any given cache.  We purposely left out some features (execution policy, failover) to try to see what we need to add back later.

                        • 9. Re: DistributedExecutionService failover is not working.
                          seto

                          William Burns wrote:

                           

                          Seto Kaiba wrote:

                           

                          It's strange that the ClusterExecutor and DistributedExecutorService exist together when I first take a look at distributed execution.

                          But what I make I finally use DES is the problem list in the above post. Unify DES and CE to be one would be great.

                          They both exist due to being implemented at different time. The DistributedExecutorService is from ISPN 5. The ClusterExecutor (ISPN 8.2) is meant to be a lighter weight implementation not tied to any given cache. We purposely left out some features (execution policy, failover) to try to see what we need to add back later.

                          Should the DES be fixed? Or the team will add the features to CE?

                          Should I report the bug of DES and try to fix it? If the team intended to unify DES and CE. Then it will be better not fix the DES and just wait for the feature with CE.

                          • 10. Re: DistributedExecutionService failover is not working.
                            seto

                            William Burns wrote:

                             

                            ClusterExecutor supports timeout with ClusterExecutor (Infinispan JavaDoc All 9.0.0.Beta2 API) .

                             

                            So it sounds like you want execution policy (topology awareness), failover and CDI? These should be doable, unfortunately they would have to wait until ISPN 10 most likely

                            Thank you for clarifying this.

                             

                            William Burns wrote:

                             

                            What do you mean by about key affinity? The new distributed streams should be preferred to be used with anything regarding keys or data instead of distributed execution. Distributed streams always goes to the primary owner of a key and if this changes while in the middle of execution it will be rerouted automatically. Unfortunately Distributed execution doesn't have that guarantee so there is a chance you may miss data if it moves while in the middle of an execution.

                            What do you mean by distributed streams? ClusterExecuotor or Java 8 streaming API. I think you're referring Java 8 streaming API. I'm thinking about the way DES does with specified keys.

                             

                            GitHub - SetoKaiba/test1

                            Reproducing project is updated. And now I find why the Test31 not working. And I committed a fix for it.

                            • Test00: submit with attachListener: NOT WORKING(Because of 1st problem and 2nd problem)
                            • Test01: submit with get: NOT WORKING(Because of 1st and 2nd problem)
                            • Test10: submit to local address with attachListener: NOT WORKING(Because of 2nd problem)
                            • Test11: submit to local address with get: WORKING
                            • Empty run 1st, Test20 run 2nd: submit to other address instead of local with attachListener: Empty WORKING
                            • Empty run 1st, Test21 run 2nd: submit to other address instead of local with get: Empty WOKRING
                            • Empty run 1st, Test30 run 2nd: submit everywhere instead of local with attachListener: Empty working and Test31 NOT WORKING(Because of 2nd problem)
                            • Empty run 1st, Test31 run 2nd: submit everywhere instead of local with get: Empty WORKING and Test31 WORKING

                             

                            In the Version 8.x:

                            But for the 2nd problem. For LocalDistributedTaskPart is returning the NotifyingFutureImpl instead of itself. Then the innerGet in the DistributedTaskPart won't be triggered. That's why it doesn't work.

                            For RemoteDistributedTaskPart is returning the RemoteFuture, which is the RemoteDistributedTaskPart itself. Then the innerGet in the DistributedTaskPart will be triggered. That's why it works.

                             

                            In the Version 9.x:

                            For the 2nd problem. The NotifyingFuture is deprecated. And it becomes CompletableFuture. I can't use the code below for a workaround.

                            .attachListener(future -> {
                              try {
                              future.get();
                              } catch (InterruptedException e) {
                              e.printStackTrace();
                              } catch (ExecutionException e) {
                              e.printStackTrace();
                              }
                            });
                            

                            But for 9.x. No futureListener in it as the NotifyingFuture. Then there's no way to auto get and failover.

                            Use the code below can trigger one time failover, but next time it won't. The failover execution will timeout.

                            But I can't find the cause yet.

                            f = des.submit(idCache.getCacheManager().getAddress(), des.createDistributedTaskBuilder(new TestCallable()).failoverPolicy(new SameNodeTaskFailoverPolicy(10)).build());
                            f.whenComplete(((aVoid, throwable) -> {
                               try {
                               f.get();
                               } catch (InterruptedException e) {
                              e.printStackTrace();
                               } catch (ExecutionException e) {
                              e.printStackTrace();
                               }
                            }));
                            

                            And will the CompletableFuture version of DES merged to 8.x?

                            • 11. Re: DistributedExecutionService failover is not working.
                              seto

                              GitHub - SetoKaiba/test1

                              Reproducing project is updated.

                              There's two branches in it.

                               

                              master branch is for 8.2.6.Final:

                              • Test00: submit with attachListener: NOT WORKING(Because of 1st problem and 2nd problem)
                              • Test01: submit with get: NOT WORKING(Because of 1st and 2nd problem)
                              • Test10: submit to local address with attachListener: NOT WORKING(Because of 2nd problem)
                              • Test11: submit to local address with get: WORKING
                              • Empty run 1st, Test20 run 2nd: submit to other address instead of local with attachListener: Empty WORKING
                              • Empty run 1st, Test21 run 2nd: submit to other address instead of local with get: Empty WOKRING
                              • Empty run 1st, Test30 run 2nd: submit everywhere instead of local with attachListener: Empty working and Test31 NOT WORKING(Because of 2nd problem)
                              • Empty run 1st, Test31 run 2nd: submit everywhere instead of local with get: Empty WORKING and Test31 WORKING

                               

                              9.0 branch is for 9.0.0.Beta2:

                              • Test00: submit with attachListener: NOT WORKING(Because of 1st problem and 2nd problem)
                              • Test01: submit with get: NOT WORKING(Because of 1st and 2nd problem)
                              • Test10: submit to local address with whenComplete: I don't know why it will timeout(Similar of 2nd problem, but it's different, it's because of timeout exception)
                              • Test11: submit to local address with get: WORKING
                              • Empty run 1st, Test20 run 2nd: submit to other address instead of local with whenComplete: Empty WORKING(no timeout for RemoteDistributedTaskPart)
                              • Empty run 1st, Test21 run 2nd: submit to other address instead of local with get: Empty WOKRING
                              • Empty run 1st, Test30 run 2nd: submit everywhere instead of local with attachListener: Empty working and Test31 NOT WORKING(Similar of 2nd problem, but it's different, it's because of timeout exception)
                              • Empty run 1st, Test31 run 2nd: submit everywhere instead of local with get: Empty WORKING and Test31 WORKING

                               

                              william.burns Could you please have a look at the 9.0.0.Beta2 reproducing project?

                              Call the future.get in the whenComplete, it will cause Timeout for LocalDistributedTaskPart, then failover is not working. But it's working for RemoteDistributedTaskPart.

                              I can't find out why.

                              • 12. Re: DistributedExecutionService failover is not working.
                                william.burns

                                Seto Kaiba wrote:

                                 

                                Should the DES be fixed? Or the team will add the features to CE?

                                Should I report the bug of DES and try to fix it? If the team intended to unify DES and CE. Then it will be better not fix the DES and just wait for the feature with CE.

                                To be honest I am not sure what the outcome of both will be.  I am guessing we will eventually dump DES and just use CE.

                                 

                                Seto Kaiba wrote:

                                 

                                Should I report the bug of DES and try to fix it? If the team intended to unify DES and CE. Then it will be better not fix the DES and just wait for the feature with CE.

                                Possibly.  These features wouldn't be in 8 though.  It would be in 9 or newer even.

                                • 13. Re: DistributedExecutionService failover is not working.
                                  william.burns

                                  Seto Kaiba wrote:

                                   

                                  What do you mean by distributed streams? ClusterExecuotor or Java 8 streaming API. I think you're referring Java 8 streaming API. I'm thinking about the way DES does with specified keys.

                                   

                                  I am talking about our Distributed Streams feature which is built on the Java 8 Stream API yes.  It has a way to filter by key explicitly which is done in a very efficient way [1]

                                   

                                  [1] CacheStream (Infinispan JavaDoc All 9.0.0.Beta2 API)

                                   

                                  Seto Kaiba wrote:

                                   

                                  And will the CompletableFuture version of DES merged to 8.x?

                                  Nope, we can't change the API there.

                                  • 14. Re: DistributedExecutionService failover is not working.
                                    seto

                                    ISPN-7447 fix for DefaultExecutorService without keys specified. by SetoKaiba · Pull Request #4853 · infinispan/infinisp…

                                    A pull request for fix submit method is 1st problem.

                                    Please check it.

                                     

                                    For 2nd problem, it's fixes in 9.0 with CompletableFuture implementation.

                                    The previous not working is because of the when complete should run in another thread.

                                    The pool is single thread pool by default.

                                    It's not an issue for 9.0.

                                     

                                    But it's an issue for 8.x. 8.x return the NotifyingFutureImpl instead of itself for LocalDistributedTaskPart.

                                    Then the innerGet can't be called. And I don't know how to fix.

                                     

                                    And I migrated to 9.x for project now.

                                    1 2 Previous Next