1 Reply Latest reply on Apr 10, 2012 10:18 AM by Vladimir Blagojevic

    map reduce question

    Mathieu Lachance Novice

      Hello there,

       

      I am relatively new to the map reduce concept but after reading the distributed execution framework and drilling down into the source code,

      here is my understanding so far of what is happening behind the scene in infinispan when performing a map reduce operation :

      1. for each node
        1. perform map reduce
          1. for each local key in map
            1. mapper.map
              1. if key/value is "interesting", collector.emit
        2. return collected results
      2. aggregate collected results
      3. reducer.reduce collected results

       

      At this point, is there anything I understand wrong ?

       

      Also by looking at :

       

      org.infinispan.distexec.mapreduce.MapReduceTask

      public Map<KOut, VOut> execute() throws CacheException {
      
      
            // final reduce
            //TODO parallelize into Executor
            Map<KOut, VOut> result = new HashMap<KOut, VOut>();
            for (Entry<KOut, List<VOut>> entry : reduceMap.entrySet()) {
               VOut reduced = reducer.reduce(entry.getKey(), (entry.getValue()).iterator());
               result.put(entry.getKey(), reduced);
            }
            return result;
      

       

      I understand there will be eventually a nice optimization by addind parallelization for the reduction phase.

       

      Tough, when looking at :

       

      org.infinispan.distexec.mapreduce.MapReduceCommand

       

      public Object perform(InvocationContext context) throws Throwable {
            
            DefaultCollector<Object,Object> collector = new DefaultCollector<Object, Object>();
            for (Object key : keys) {
               GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(key, ctx.getFlags());
               command.setReturnCacheEntry(false);
               Object value = invoker.invoke(ctx, command);
               mapper.map(key, value,collector);
            }
      

       

      shouldn't the map phase be parallelized as well ? make the collector thread safe, split the keys by the number of processors, then map ?

      just an idea, but still I'm new to this map reduce concept, is there something I'm missing or wrong ?

       

      again, big thanks,

        • 1. Re: map reduce question
          Vladimir Blagojevic Master

          Mathieu,

           

          Thanks for your curiousity regarding Infinispan map reduce implementation. Although fully functional it is fair to say that our implementation is still experimental. To understand concepts of map reduce I suggest you to go through Google's original paper. Some of the things you noticed are right on. We need to parallelize reduce execution across Infinispan cluster. Also, we need to introduce the concept of Combiner. Combining is currently done automatically, although this step, depending on the nature of map function sometimes can not and should not always be done! We are now in designing stage for this second push with map reduce after we have collected a lot of valuable feedback from users like yourself. Keep reading, digging and stay tuned for updates.

           

          All the best,

          Vladimir