4 Replies Latest reply on Apr 1, 2014 11:35 AM by Christina Lin

    MapReduceTask, Is it possible to access other cache in Mapper?

    Christina Lin Newbie

      Hi,

       

      I have been working on a MapReduceTask, in Mapper, I tried to calculate, say employee's overtime salary for a weekly summary.

      But I need to access other cache in order to get all the informations.

       

      So I did this:

       

      package org.demo.mapreduce;
      
      
      import org.demo.model.ClockOut;
      import org.demo.model.Employee;
      import org.demo.model.Overtime;
      import org.infinispan.Cache;
      import org.infinispan.distexec.mapreduce.Collector;
      import org.infinispan.distexec.mapreduce.Mapper;
      
      
      public class ClockoutMapper implements Mapper<ClockOut,ClockOut,String,Integer>{
      
      
        private static final long serialVersionUID = -6264767987234523851L;
        Cache<Integer, Integer> salaryRateCache;
        Cache<String, Employee> employeeCache;
      
        public ClockoutMapper(Cache<Integer, Integer> salaryRateCache, Cache<String, Employee> employeeCache){
      
        this.salaryRateCache = salaryRateCache;
        this.employeeCache = employeeCache;
      
        }
      
      
      
        @Override
        public void map(ClockOut key, ClockOut value, Collector<String, Integer> collector) {
      
        try {
          Employee employee = employeeCache.get(key.getEmployeeID());
          collector.emit(key.getEmployeeID(), Overtime.calculteOverTime(salaryRateCache.get(employee.getEmployeeLevel()), key.getClockoutTime()));
      
        } catch (Exception e) {
        e.printStackTrace();
        }
        }
      
      
      }
      

       

      Turns out , it doesn't support Cache, and throws following exceptions:

      Caused by: org.infinispan.commons.marshall.NotSerializableException: org.infinispan.CacheImpl

      Caused by: an exception which occurred:

        in field employeeCache

        in object org.demo.mapreduce.ClockoutMapper@6da390f4

       

      I think it's because the mapper has to be distributed among nodes..

      But what happens if I need to access other cache really badly in mapper?

      Is there anyway to do it?

        • 1. Re: MapReduceTask, Is it possible to access other cache in Mapper?
          Vladimir Blagojevic Master

          Christina,

           

          Good question. Yes there is. Since CacheImpl is not Serializable you can use CDI mechanism to inject cache instance into a mapper [1] once that mapper is migrated to execution node. The cache injected is the input cache used to create MapReduceTask, and you need another cache. However, using injected instance of the cache we can lookup any other cache in the same cache manager. Simply use Cache#getCacheManager() [2] to get CacheManager and then lookup the desired cache. Make sure you follow http://infinispan.org/docs/6.0.x/user_guide/user_guide.html#_cdi_support to turn on CDI support.

           

          Regards,

          Vladimir

           

           

          [1] http://infinispan.org/docs/6.0.x/user_guide/user_guide.html#_mapper_and_cdi

          [2] Cache (Infinispan Distribution 6.0.2.Final API)

          1 of 1 people found this helpful
          • 2. Re: Re: MapReduceTask, Is it possible to access other cache in Mapper?
            Christina Lin Newbie

            Thanks Vladimir, that was very helpful!

            Now I modified my program like this,

            package org.demo.mapreduce;
            
            import javax.inject.Inject;
            
            import org.demo.model.ClockOut;
            import org.demo.model.Employee;
            import org.demo.model.Overtime;
            import org.demo.model.SalaryLevel;
            import org.infinispan.Cache;
            import org.infinispan.cdi.Input;
            import org.infinispan.distexec.mapreduce.Collector;
            import org.infinispan.distexec.mapreduce.Mapper;
            import org.infinispan.manager.EmbeddedCacheManager;
            
            
            public class ClockoutMapper implements Mapper<ClockOut,ClockOut,String,Integer>{
            
              @Inject
              @Input
                private Cache<ClockOut, ClockOut> clockOutCache;
            
              private static final long serialVersionUID = -6264767987234523851L;
              private Cache<Integer, Integer> salaryRateCache;
              private Cache<String, Employee> employeeCache;
            
            
              @Override
              public void map(ClockOut key, ClockOut value, Collector<String, Integer> collector) {
            
            
                EmbeddedCacheManager cacheManager = clockOutCache.getCacheManager();
              salaryRateCache = cacheManager.getCache(SalaryLevel.SalaryRate_Cache_Name);
              employeeCache = cacheManager.getCache(Employee.Employee_Cache_Name);
            
              try {
                Employee employee = employeeCache.get(key.getEmployeeID());
                collector.emit(key.getEmployeeID(), Overtime.calculteOverTime(salaryRateCache.get(employee.getEmployeeLevel()), key.getClockoutTime()));
            
              } catch (Exception e) {
                e.printStackTrace();
              }
              }
            
            }
            

            And added the following dependency to my POM.xml

             

             <dependency>
                        <groupId>javax.enterprise</groupId>
                        <artifactId>cdi-api</artifactId>
                        <scope>provided</scope>
                        <version>1.0</version>
                </dependency>
             <dependency>
                  <groupId>org.infinispan</groupId>
                  <artifactId>infinispan-cdi</artifactId>
                  <version>${version.org.infinispan}</version>
             </dependency>
             <dependency>
              <groupId>javax.inject</groupId>
              <artifactId>javax.inject</artifactId>
              <version>1</version>
             </dependency>
            

            But  it throws this exception :

            Caused by: java.lang.IllegalStateException: No org.infinispan.cdi.util.BeanManagerProvider in place! Please ensure that you configured the CDI implementation of your choice properly. If your setup is correct, please clear all caches and compiled artifacts.

              at org.infinispan.cdi.util.BeanManagerProvider.getInstance(BeanManagerProvider.java:118)

              at org.infinispan.cdi.CDIMapReduceTaskLifecycle.onPostExecute(CDIMapReduceTaskLifecycle.java:34)

              at org.infinispan.distexec.mapreduce.spi.MapReduceTaskLifecycleService.onPostExecute(MapReduceTaskLifecycleService.java:51)

              at org.infinispan.distexec.mapreduce.MapReduceManagerImpl.map(MapReduceManagerImpl.java:202)

              at org.infinispan.distexec.mapreduce.MapReduceManagerImpl.mapAndCombineForLocalReduction(MapReduceManagerImpl.java:89)

              at org.infinispan.distexec.mapreduce.MapReduceTask$MapTaskPart.invokeMapCombineLocallyForLocalReduction(MapReduceTask.java:955)

              at org.infinispan.distexec.mapreduce.MapReduceTask$MapTaskPart.access$300(MapReduceTask.java:894)

             

            Is there something that I am doing is missing?

            • 3. Re: Re: MapReduceTask, Is it possible to access other cache in Mapper?
              Vladimir Blagojevic Master

              Christina,

               

              There is something wrong with setup of your CDI environment. Please read references available online and adjust your pom.xml and environment accordingly and I am confident you will figure this one out :-)

               

              Regards,

              Vladimir

              • 4. Re: MapReduceTask, Is it possible to access other cache in Mapper?
                Christina Lin Newbie

                I figured out the problem, because I was running it in the standalone jar file to startup the hot rod server.

                The BeanManagerProvider did not get injected when server starts up. It needs JavaEE container to do that.

                I got stuck for few days, and didn't see the obvious problem. "CDI works with EE"......hahaha..

                Thanks for the help!