Version 14

    Introduction


     

    The following proposal outlines enhancements to DistributedExecutorService planned for the next major Infinispan release. Everyone is invited to contribute to this proposal either by commenting directly on this design document, emailing Infinispan dev list or even by forking proposal branch from github and issuing pull requests.

     

     

    Pluggable task failover policy

     

    One of the long standing requirements for distributed executors as well as map/reduce is the pluggable task fail over policy. Task failure can be both due to task itself throwing an exception and due to Infinispan execution node failure (node crashed/left cluster). Pluggable fail over policy should address both types of task failures and allow users to migrate the failed task to another node if required to do so. Fail over policy is set per instance of DistributedExecutorService and MapReduceTask. The following interface is a current working proposal along with a proposal for DistributedExecutionException.

     

     

    public interface DistributedTaskFailoverPolicy {
    
       
    /**
        * DistributedExecutorService used to resubmit a task 
        * 
        * @param service
        */
       void setDistributedExecutorService(DistributedExecutorService service);
    
    
    
       /**
        * Allows custom failure handling of a remotely executed distributed task.
        * 
        * <p>
        * Tasks can fail due to the task itself throwing an exception or it can be an Infinispan system
        * caused failure (e.g node failed or left cluster during task execution). Either way,
        * DistributedExecutionException captures the failure context, the relevant task and the
        * exception. Failure exception, the failed task along with the given list of available execution
        * candidates should be used for a possible fail-over and execution of a failed task on another
        * Infinispan node.
        * 
        * 
        * @param executionTargets
        *           a list of nodes available for execution of distributed task
        * @param cause
        *           the Exception capturing details of a failed task execution
        * @return result of re-attempted execution
        */
       public <T> T executionFailed(List<Address> executionTargets, DistributedExecutionException cause);
    }
    
    

     

     

    DistributedExecutionException captures the context of the task failure consisting of Address of the node where task was executed but failed; the distributed task itself is abstracted into DistributedFuture.


    public class DistributedExecutionException extends Exception{
    
       private Address addressOfExecution;
       private DistributedFuture<?> f;
    
       public DistributedExecutionException() {
       }
    
       public DistributedExecutionException(String message, 
         Throwable cause, Address addressOfExecution, 
         DistributedFuture<?> f) {
          super(message, cause);
          this.addressOfExecution = addressOfExecution;
          this.f = f;
       }
    
       public Address getExecutionLocation(){
          return addressOfExecution;
       }
    
       public DistributedFuture<?> getDistributedTask(){
          return f;
       }      
    }
    
    

     

     

    The most basic random task failover policy could be written as simply as

     

     

    public class RandomNodeTaskFailoverPolicy implements DistributedTaskFailoverPolicy {
    
       DistributedExecutorService service;
    
       @Override
       public void setDistributedExecutorService(DistributedExecutorService service) {
          this.service = service;
       }
    
    
       @Override
       public T executionFailed(List<Address> executionTargets, DistributedExecutionException cause) {
    
          DistributedFuture<?> distributedTask = cause.getDistributedTask();
          DistributedExecuteCommand<?> dec = distributedTask.getCommand();
          Future<?> future =null;
          if(dec.hasKeys()){
             future = service.submit(randomNode(executionTargets), dec.getCallable(), dec.getKeys());         
          } else {
             future = service.submit(randomNode(executionTargets), dec.getCallable());         
          }
          try {
             return (T) future.get();
          } catch (Exception e){
             return null;
          }
       }
    }
    

     

     

     

     

    Pluggable task mapping policy

     

    Pluggable task mapping policy allows users of distributed executors and map/reduce to effectively load balance distributed tasks, veto and override Infinispan nodes selected for task execution. For example, someone might want to execute tasks exclusively on a local network island instead of a backup remote island. Others might, for example, use only a dedicated subset of nodes in Infinispan cluster for task execution. Task mapping policy is set per instance of DistributedExecutorService and MapReduceTask. The following interface is a current working proposal.

     

     

    public interface DistributedTaskMappingPolicy {
    
    
       /**
        * Given a set of input keys for a distributed task an implementation should return an address to
        * list of keys mapping where each Address key is a node to execute a distributed task T and list
        * of keys value is a list of input keys for distributed task T.
        * 
        * @param <K>
        *           the type for input keys
        * @param input
        *           all input keys for a distributed task
        * @return an address to list of keys mapping
        * 
        */
       public <K> Map<Address, List<K>> mapKeysToExecutionNodes(K... input);
    
       /**
        * Implementations of DistributedTaskMappingPolicy are given an opportunity to override suggested
        * execution node with a another node.
        * <p>
        * 
        * 
        * @param executionTarget execution target suggested
        * @param candidates list of potential candidates to choose from
        * @return an address that is a selected execution target
        */
       public Address executionTargetSelected(Address executionTarget, List<Address> candidates);
    
    }
    
    

     

    Infinispan's DistributedExecutorService always invokes executionTargetSelected method while mapKeysToExecutionNodes is invoked only if input keys are given. If you are dealing with tasks that have input keys all you need to do is to implement mapKeysToExecution method, otherwise, if no keys are used, the second method executionTargetSelected needs to be implemented. Infinispan's DistributedExecutorService implementation, upon task submittal, first invokes mapKeysToExecutionNode method callback, if and only if input keys are given. After task mapping phase DistributedExecutorService invokes executionTargetSelected callback regardless if input keys are given or not.