map reduce question
matlach Mar 28, 2012 10:16 PMHello there,
I am relatively new to the map reduce concept but after reading the distributed execution framework and drilling down into the source code,
here is my understanding so far of what is happening behind the scene in infinispan when performing a map reduce operation :
- for each node
- perform map reduce
- for each local key in map
- mapper.map
- if key/value is "interesting", collector.emit
- mapper.map
- for each local key in map
- return collected results
- perform map reduce
- aggregate collected results
- reducer.reduce collected results
At this point, is there anything I understand wrong ?
Also by looking at :
org.infinispan.distexec.mapreduce.MapReduceTask
public Map<KOut, VOut> execute() throws CacheException { // final reduce //TODO parallelize into Executor Map<KOut, VOut> result = new HashMap<KOut, VOut>(); for (Entry<KOut, List<VOut>> entry : reduceMap.entrySet()) { VOut reduced = reducer.reduce(entry.getKey(), (entry.getValue()).iterator()); result.put(entry.getKey(), reduced); } return result;
I understand there will be eventually a nice optimization by addind parallelization for the reduction phase.
Tough, when looking at :
org.infinispan.distexec.mapreduce.MapReduceCommand
public Object perform(InvocationContext context) throws Throwable { DefaultCollector<Object,Object> collector = new DefaultCollector<Object, Object>(); for (Object key : keys) { GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(key, ctx.getFlags()); command.setReturnCacheEntry(false); Object value = invoker.invoke(ctx, command); mapper.map(key, value,collector); }
shouldn't the map phase be parallelized as well ? make the collector thread safe, split the keys by the number of processors, then map ?
just an idea, but still I'm new to this map reduce concept, is there something I'm missing or wrong ?
again, big thanks,