The following proposal outlines enhancements to Infinispan Map/Reduce framework planned for the next major Infinispan release. Update (July 20th, 2012) These enhancements have already been integrated into master and will be included in Infinispan 5.2 release.
Shortcomings of the current map reduce implementation
While our current map reduce implementation is more than a proof of a concept there are several drawbacks preventing it from being an industrial grade map reduce solution. The main drawback is the inability of the current solution to deal with a large data (in GB/TB) map reduce problems. This shortcoming is centered around our reduce phase execution. Reduce phase, as you might know, is currently done on a single Infinispan master task node; reduce phase of map reduce problems we can support (data size wise) is therefore shrunk to a working memory of a single Infinispan node.
The proposed solution involves distributing execution of reduce phase tasks across the cluster thus effectively achieving higher reduce task parallelization and at the same time removing the above mentioned reduce phase restriction. Effectively leveraging our consistent hashing solution even further we can parallelize reduce phase and elevate our map reduce solution to an industrial level. Here is how we can achieve that.
Upon creation, MapReduceTask first creates a unique task id. Task id is used uniquely identify a task amongst all other concurrently executing map reduce tasks on the cluster. Unique task id is also used to create temporary caches on all Infinispan nodes for that task. Temporary caches will be used to store intemediate results of map phase before reduce phase is executed.
MapReduceTask, as it currently does, will hash task input keys and group them by execution node N they are hashed to. For each node N and its grouped input KIn keys MapReduceTask creates a MapCombineCommand which is migrated to an execution target node N. MapCombineCommand is similar to current MapReduceCommand. MapCombineCommand takes an instance of a Mapper and an instance of a Reducer, which is a combiner .
Once loaded into target execution node MapCombineCommand takes each node local KIn/VIn key/value pair and executes Mapper method
void map(KIn key, VIn value, Collector<KOut, VOut> collector).
Results are collected to an Infinispan supplied Collector<KOut, VOut> collector and combine phase is initiated. A Combiner, if specified, takes KOut keys and imediatelly invokes reduce phase on keys. The result of mapping phase executed on each node is <KOut, VOut> map M. There will be one resulting M map per execution node N per launched map reduce task.
Intermediate keys/values grouping phase
In order to proceed with reduce phase all intermediate keys and values need to be grouped. More specifically, as map phases around the cluster can produce identical intermediate keys, all those identical intermediate keys and their values need to collected before reduce is done on any particular intermediate key.
Therefore, at the end of combine phase instead of returning map M to the master task node (as we currently do), we instead hash each KOut in map M and group KOut keys and its values VOut by the execution node N where keys KOut are hashed to. We achieve this using a temporary DIST cache and underlying CH mechanism. Since we have to collect all values per intermediate key we have to somehow maintain KOut ---> List<VOut> mapping, i.e all KOut/VOut pairs from all executed MapCombineCommands will be collocated on a node N where KOut is hashed to; value for KOut will be a list of all VOut values. Thus we effectively collect all VOut values under each KOut for all executed MapCombineCommand(s) while at the same time we spread KOut ---> List<VOut> across the cluster. These KOut---> List<VOut> values will be stored in temporary DIST cache identified by unique task id. The underlying collection of values per key is implemented using DeltaAwareList class of MapCombineCommand.
At this point MapCombineCommand has finished its execution; list of KOut keys is returned to a master node and its MapReduceTask. We do not return VOut values as we do not need them at master task node. MapReduceTask is ready to start with reduce phase.
MapReduceTask initializes ReduceCommand with a user specified Reducer. For each key KOut collected from a map phase we group them by execution node N they are hashed to. For each node N and its grouped input KOut keys MapReduceTask creates a ReduceCommand and sends it to a node N where KOut keys are hashed. Once it arrives on target execution node, ReduceCommand looks up temporary cache belonging to that task, for each KOut key grabs list of values VOut from temporary cache and invokes:
VOut reduce(KOut reducedKey, Iterator<VOut> iter)
A result of ReduceCommand is a map M where each key is KOut and value is VOut. Each Infinispan execution node N returns one map M where each key KOut is hashed to N and each VOut is KOut's reduced value.
When all ReduceCommands return to a calling node, MapReduceTask simply combines all these M maps and returns final Map<KOut, VOut> as a result of MapReduceTask. As we currently allow user is also given an opportunity to transform Map<KOut,VOut> result using Collator.
As task completes all temporary KOut->List<VOut> caches remaining on Infinispan cluster are stopped an cleaned up.