6 Replies Latest reply on Nov 21, 2014 10:51 AM by bhoomiredy

    Infinispan Distributed Cache Issues

    vijayanarayana.reddy.bhoomireddy

      I am new to Infinispan. We have setup a Infinispan cluster so that we can make use of the Distributed Cache for our CPU and memory intensive task. We are using UDP as the communication medium and Infinispan MapReduce for distributed processing. The problem we are using facing is with the throughput. When we run the program on a single node machine, the total program completes in around 11 minutes i.e. processing a few lakh records and finally emitting around 4 lakh records.

      However, when we use a cluster for the same dataset, we are seeing a throughput of only 200 records per second being updated between the nodes in the cluster, which impacts the overall processing time. Not sure which configuration changes are impacting the throughput so badly. I am sure its something to do with the configuration of either JGroups or Infinispan.

      Can anyone please suggest on what could be done better?

        • 1. Re: Infinispan Distributed Cache Issues
          wdfink

          Hello and welcome to Infinispan

           

          I don't understand your issue as you compare 11 minutes and 200rec/sec which does not match for me.

          Also could you please describe whether you use C/S or embedded mode and how your configuration look like, maybe a code example can help too.

          • 2. Re: Infinispan Distributed Cache Issues
            bhoomiredy

            Thanks Wolf-Dieter Fink

             

            May be I was not clear in my earlier post. Please find the details below:

             

            Our requirement is to process huge source datasets with a focus on minimizing the processing time. Processing is done in multiple stages similar to a data pipeline. For example, In the first stage, source data is processed and intermediate data is generated. In the next stage, output of first stage is used as the input for the second stage and so on. In the final stage, we generate the output dataset.

             

            As these datasets (source, intermediate) are huge and cannot be fit into a single computer memory, we have chosen to use Infinispan as our data grid where the data is stored across the cluster in the distributed cache. The processing is spread across the nodes of the Infinispan cluster and works on the data stored in the distributed cache.

             

            For testing purposes, we have used a dataset which contains a few thousands of records (~20k) which will eventually output ~4 lakh records. When we run this processing on a Infinispan cluster with only one machine i.e. a single node cluster, it takes roughly 11 minutes to do the complete processing. However, when I run the same in a cluster with more machines, it takes longer time to process. For the same data set, when we run it on a cluster of 2 machines, its taking ~54minutes to generate the output.

             

            We are using UDP as the communication medium in JGroups and Infinispan MapReduce for distributed processing. We are using Infinispan in Embedded Mode i.e. in a P2P fashion. As it’s an in-memory processing with a clustered approach, we are expecting the processing time to come down from 11 minutes. However, in this case, it has shot up drastically.

             

            We are not able to understand the root cause of the same. Could that be because of improper JGroups configuration? Please find our configuration details below.

             

            <!--

              ~ JBoss, Home of Professional Open Source

              ~ Copyright 2010 Red Hat Inc. and/or its affiliates and other

              ~ contributors as indicated by the @author tags. All rights reserved.

              ~ See the copyright.txt in the distribution for a full listing of

              ~ individual contributors.

              ~

              ~ This is free software; you can redistribute it and/or modify it

              ~ under the terms of the GNU Lesser General Public License as

              ~ published by the Free Software Foundation; either version 2.1 of

              ~ the License, or (at your option) any later version.

              ~

              ~ This software is distributed in the hope that it will be useful,

              ~ but WITHOUT ANY WARRANTY; without even the implied warranty of

              ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU

              ~ Lesser General Public License for more details.

              ~

              ~ You should have received a copy of the GNU Lesser General Public

              ~ License along with this software; if not, write to the Free

              ~ Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA

              ~ 02110-1301 USA, or see the FSF site: http://www.fsf.org.

              -->

            <config xmlns="urn:org:jgroups"

                    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                    xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.4.xsd">

               <!--

                Note that the buffer sizes here are very small, you'll definitely want higher values in production.

                But the Linux defaults are also very small, and using higher values here without changing the system

                settings would only result in startup warnings.

                 -->

                <UDP

                        ip_mcast="true"

                        mcast_addr="${jgroups.udp.mcast_addr:239.255.255.250}"

                        mcast_port="${jgroups.udp.mcast_port:46655}"

                        tos="8"

                        ucast_recv_buf_size="200M"

                        ucast_send_buf_size="200M "

                        mcast_recv_buf_size="200M "

                        mcast_send_buf_size="200M "

                        loopback="true"

                        max_bundle_size="64000"

                        max_bundle_timeout="30"

                        enable_batching="true"

                       

                        ip_ttl="${jgroups.udp.ip_ttl:2}"

                        enable_bundling="true"

                        enable_diagnostics="false"

                        bundler_type="new"

             

                        thread_naming_pattern="pl"

             

                        thread_pool.enabled="true"

                        thread_pool.min_threads="2"

                        thread_pool.max_threads="30"

                        thread_pool.keep_alive_time="60000"

                        thread_pool.queue_enabled="true"

                        thread_pool.queue_max_size="100"

                        thread_pool.rejection_policy="Discard"

             

                        oob_thread_pool.enabled="true"

                        oob_thread_pool.min_threads="2"

                        oob_thread_pool.max_threads="30"

                        oob_thread_pool.keep_alive_time="60000"

                        oob_thread_pool.queue_enabled="false"

                        oob_thread_pool.queue_max_size="100"

                        oob_thread_pool.rejection_policy="Discard"

                        />

             

                <PING timeout="3000" num_initial_members="2"/>

                <MERGE3 max_interval="30000" min_interval="10000"/>

                <FD_SOCK/>

                <FD_ALL timeout="15000"/>

                <VERIFY_SUSPECT timeout="5000"/>

               

                <pbcast.NAKACK2

                        xmit_interval="1000"

                        xmit_table_num_rows="100"

                        xmit_table_msgs_per_row="10000"

                        xmit_table_max_compaction_time="10000"

                        max_msg_batch_size="100"/>

             

             

                <pbcast.STABLE

                   stability_delay="500"

                   desired_avg_gossip="5000"

                   max_bytes="1m"

                   send_stable_msgs_to_coord_only="true"/>

                  

                <pbcast.GMS print_local_addr="true" join_timeout="3000" view_bundling="true"/>

                <UFC max_credits="200k" min_threshold="0.20"/>

                <MFC max_credits="200k" min_threshold="0.20"/>

                <FRAG2 frag_size="8000"  />

                <RSVP timeout="60000" resend_interval="500" ack_on_delivery="true" />

            </config>

             

             

            Regards

            Vijay

            • 3. Re: Infinispan Distributed Cache Issues
              wdfink

              Hmmm,

              one problem might be the network speed, how your nodes are connected?

              The other issue might be the cache and store configuration. Could you attach how you configure the cache?

              numOwners, sync/async mode and a persistence might have an influence

              • 4. Re: Infinispan Distributed Cache Issues
                bhoomiredy

                Wolf-Dieter Fink,

                 

                Our configuration is managed programmatically.Here is the code snippet below.

                 

                Configuration config = new ConfigurationBuilder()

                          .clustering()

                          .cacheMode(CacheMode.DIST_SYNC)

                          .hash().numOwners(1)

                          .invocationBatching().enable()

                          .build();

                 

                As this is being done as a proof of concept work, we are using our corporate intranet to test the same. As you mentioned, we need to investigate whether network is the real culprit here due to some network policy enforcements. We shall investigate the same.

                 

                However, can you please tell whether the configuration looks fine?

                 

                Regards

                Vijay

                • 5. Re: Infinispan Distributed Cache Issues
                  wdfink

                  numOwners(1) is a failover issue as you have no copy.

                  On the other hand this mean the data is mostly accessed remote and must be transfered anyway which might be a performance issue here.

                   

                  So it might be an option to use a Infinispan server as a separate instance, the application is independent from the cache data and can be restarted without an effect to the cache.

                  Also you can add more resources to the cache if needed.

                  Maybe a replication for safety.

                  The HodRod client is able to locate the correct instance for a given key so there should no big overhead here.

                   

                  Another issue is how often the cache entry is read or changed, in case of huge data that might be an issue.

                  • 6. Re: Infinispan Distributed Cache Issues
                    bhoomiredy

                    Thanks Wolf-Dieter Fink for your quick responses. We are building a Big Data ETL application on top of Infinispan. For this we load data from external files and bring it to Infinispan Cache (as quickly as possible). And on this cache data, we need to run distributed processing. We went with Infinispan,since we could store and process the data in distributed/parallel fashion. However, we could not find any support for running MapReduce or DistributedExecutors on a RemoteCache. So that means we could not use HotRod, MemCached or REST for this. Hence we didn't go with the Client/Server mode. So we had the only option left - EmbeddedCache mode.

                    For your previous questions - In our use cases, we populate the cache data once and read it only once. There are no updates to cache at all.