5 Replies Latest reply on Feb 9, 2016 8:37 AM by william.burns

    How to calculate the average value of the items in a cache?

    mariusneo

      ello,

       

      I was trying to calculate via streams (instead of using the obsolete model of map-reduce) the average temperature of all countries in the weatherapp infinispan sample code :

       

      https://github.com/infinispan/infinispan-embedded-tutorial/compare/step-9...step-10

       

       

       

       

      In order to calculate the average, I want to employ a double summarizer (for getting the sum as well as the count of the elements) :

       

      Source code :

      {code}

       

      public void averageCountryTemperatures() {

        System.out.println("---------- Average countries temperature ---------");

       

        Map<String, WeatherDataStatistics> resultMap = cache.values().stream()

        .collect(CacheCollectors.serializableCollector(() -> groupingBy(LocationWeather::getCountry,

         collectingAndThen(

         summarizingDouble(locationWeather -> (double) locationWeather.getTemperature()),

        dss -> new WeatherDataStatistics(dss.getAverage(), dss.getSum())

        )))

        );

       

       

        System.out.printf("Avg of the countries temperatures is : %s \n", resultMap);

      }

      {code}

       

      (WeatherDataStatistics is serializable, DoubleSummaryStatistics on the other hand is not )

       

       

      Unfortunately the following stacktrace pops out which seems not very much helpful for me as a user of infinispan:

       

      {code}

       

      ERROR: ISPN000073: Unexpected error while replicating

      org.infinispan.commons.marshall.NotSerializableException: java.util.DoubleSummaryStatistics

      Caused by: an exception which occurred:

        in object java.util.DoubleSummaryStatistics@590d97f5

        in object java.util.HashMap@8b788f28

        in object org.infinispan.stream.impl.StreamResponseCommand@5ffec2a6

       

       

      Feb 08, 2016 8:41:08 AM org.infinispan.remoting.inboundhandler.BasePerCacheInboundInvocationHandler exceptionHandlingCommand

      WARN: ISPN000071: Caught exception when handling command StreamRequestCommand{cacheName='___defaultcache'}

      org.infinispan.commons.marshall.NotSerializableException: java.util.DoubleSummaryStatistics

      Caused by: an exception which occurred:

        in object java.util.DoubleSummaryStatistics@590d97f5

        in object java.util.HashMap@8b788f28

        in object org.infinispan.stream.impl.StreamResponseCommand@5ffec2a6

      {code}

       

       

      Since there are very various cases in which streams could be used, it would make very much sense to build a solid number of code samples on how to employ the streams.

       

      Two questions arise here :

      - How can the problem of calculating averages in the cache can be solved via streams?

      - It seems to me (I'm a newbie with infinispan) that the MapReduce utilities still make sense in infinispan because it is much more straightforward from a developer perspective how to get to calculate the average. Doesn't it seem not justified the deprecation of the mapreduce utilities from infinispan?

       

       

       

      Thanks in advance for the support,

      Marius.

        • 1. Re: How to calculate the average value of the items in a cache?
          rvansa

          Distributed streams implement standard Java interface, so there are plenty of sources on the topic 'How to use Java streams?'. It's true that there's little info in docs (@wburns, have I missed that?), the best source ATM is this blogpost: Infinispan: Distributed Streams - most notable difference is that Java lambdas are not serializable without additional effort, and standard Collectors aren't serializable either.

           

          To solve your problem, you need to make DoubleSummaryStatistics serializable/externalizable, I think that the stacktrace is quite self-explanatory. It may be not so clear how to solve that, though; please consult Marshalling chapter in Infinispan User Guide to find out how an Externalizer or AdvancedExternalizer can be registered.

          • 2. Re: How to calculate the average value of the items in a cache?
            william.burns

            Unfortunately the documentation piece I was working on and due to personal reasons I wasn't able to complete that before I had to leave.  I am back now though and this should be in before 8.2.0 Final is out.

             

            Also you already answered your concern about the "confusing" stack trace when you stated

            Marius Grama wrote:

             

            (WeatherDataStatistics is serializable, DoubleSummaryStatistics on the other hand is not )

            Unfortunately DoubleSummaryStatistics is not Serializable by default.  I looked at this a bit when I was first implementing distributed streams and didn't see a good way of doing this, using reflection in an Externalizer seems like the only realistic way.  Instead I was thinking of possibly adding another method to CacheCollectors to replace the Collectors methods that use the SummaryStatistics and also the summaryStatistics methods from the DoubleStream, IntStream, LongStream implementations.  What do you think?

             

            The key part here to realize that when using a distributed stream is that we will attempt to distribute as much as feasible across the cluster.  So the confusing part may be figuring out which part is returned and finally done as the final collect phase.  This is done in between your collectingAndThen method.  The downstream collector is distributed and the results from that are sent to the owning node where it does a final reduction using the finisher.

             

            The problem (both streams and map/reduce) is you need an intermediate structure that would allow for reduction in an associative way to be applied that is also Serializable.  Unfortunately the JRE by default doesn't some a structure.

             

            The simplest way I can see for your case is to use a double[] that is 2 wide (one element containing the count and another the running count).  You can then use the reducing method as the downstream collector of collectingAndThen.  Then for the finisher Function in the andThen clause you can calculate the average from those numbers and have all the data available you wanted.

             

            Marius Grama wrote:

             

            How can the problem of calculating averages in the cache can be solved via streams?

             

            I answered yours above, but I also want to clarify something else.  Looking at your code, you are doing something different than the Map/Reduce tutorial.  In your case you are returning sum and average for each country, where as Map/Reduce is returning only the average.  Returning multiple values like that is a bit more cumbersome when using Map/Reduce (it would also need this intermediate container).

             

            If you want a simple streams example that does exactly what the map reduce does in the embedded tutorial, this is all that is needed:

            Map<String, Double> resultMap = cache.entrySet().stream()
                 .collect(CacheCollectors.serializableCollector(() -> groupingBy(Map.Entry::getKey,
                          averagingDouble(e -> e.getValue().temperature))));
            
            

             

             

            Marius Grama wrote:

             

            It seems to me (I'm a newbie with infinispan) that the MapReduce utilities still make sense in infinispan because it is much more straightforward from a developer perspective how to get to calculate the average. Doesn't it seem not justified the deprecation of the mapreduce utilities from infinispan?

            There are quite a few reasons for this actually.

             

            1. While the map/reduce API is very similar to Hadoop and others, the new streams is using the new Java APIs directly.  This will gain more traction and usage as the years go on and the latter will most likely dwindle.
            2. The new streams in our testing is faster for every use case we have ran so far.  Larger data sets especially it starts performing many orders of magnitude faster.
            3. The Map/Reduce implementation is not rehash aware.  Meaning that if a node leaves or joins while the computation is running you will almost always get the wrong value.  Streams supports this and even allows disabling if needed.
            • 3. Re: How to calculate the average value of the items in a cache?
              william.burns

              I also have since created [ISPN-6191] Provide Externalizers for (Double|Long|Int)SummaryStatistics - JBoss Issue Tracker.  I convinced myself this go around that this is the easiest way.  This should be fixed in 8.2.0, so you will be able to use any of the collector methods that return back summary statistics.

              • 4. Re: How to calculate the average value of the items in a cache?
                mariusneo

                Hi @wburns,

                first thank you for taking the time to write this detailed answer. The solution you've provided for calculating the average does indeed work as expected.

                The simplest way I can see for your case is to use a double[] that is 2 wide (one element containing the count and another the running count).  You can then use the reducing method as the downstream collector of collectingAndThen.  Then for the finisher Function in the andThen clause you can calculate the average from those numbers and have all the data available you wanted.

                I was trying several possibilities on how to get the average and this is how I got to calculating the double summary statistics, when there was a much simpler solution at hand (like the one you've mentioned).

                 

                On the other hand concerning the XSummaryStatistics serialization trick, I'm not sure that this is really needed. See whether it will not introduce new problems down the road with it just for giving syntactic sugar usage for the stream manipulation.

                I still think that having a exhaustive and mature set of code samples (with the downside of having to maintain them) would be very helpful for the users of infinispan.

                • 5. Re: How to calculate the average value of the items in a cache?
                  william.burns

                  Great, glad to hear you got it working.

                   

                  Marius Grama wrote:

                   

                   

                  I was trying several possibilities on how to get the average and this is how I got to calculating the double summary statistics, when there was a much simpler solution at hand (like the one you've mentioned).

                   

                  On the other hand concerning the XSummaryStatistics serialization trick, I'm not sure that this is really needed. See whether it will not introduce new problems down the road with it just for giving syntactic sugar usage for the stream manipulation.

                  I still think that having a exhaustive and mature set of code samples (with the downside of having to maintain them) would be very helpful for the users of infinispan.

                   

                  Yes for the averages the small code snippet will work for just averages.  But if you want to return more than that, say also the count or sum, you need the intermediate container.  Because of that I have gone the way of providing Serialization for the various SummaryStatistics classes in the previous JIRA I mentioned.

                   

                  I agree on the examples, the tutorial will be updated at some point in the near future to use streams instead.

                   

                  Thanks for your input!