10 Replies Latest reply on May 29, 2012 12:29 PM by c_lohmn

    balanced state with data from stores

    marque

      Hi,

       

      I have the following scenario:

       

      Distributed data on n nodes, where each node has only its own data (numOwners 1). The data is directly written in store and only the read is from the cache. I use a distributed execution to sort and filter on this data and get some values back from all nodes. This works fine.

       

      But all nodes write to the same data store and if I shutdown and restart I have no idea how to distribute the data on the nodes again. If I only restart with my configuration, all nodes read all data. I would expect at least, that the first node reads all data and if some nodes join a rebalancing will occur. (But what I really need is a solution where each node only reads the sum/nodecount data)

       

      Do I miss something in the configuration for this scenario?

       

      Thanks

        • 1. Re: balanced state with data from stores
          manik

          I presume you are using a shared data store?  Then all nodes will have access to all data.

          • 2. Re: balanced state with data from stores
            marque

            Hi Manik,

             

            yes - I use a jdbc shared data store. I have the following example:

             

            3 nodes, and each node creates 4 entries - in the shared JDBC Store are 12 entries now. Then I shutdown all nodes. If I restart the first node (store preload is activiated) it reads all 12 entries.

             

            The second node starts and reads the 12 entries from db. Then the rehashing occurs and moves 8 entries from node 1 to node 2.

             

            After this

                node 1 = 4 entries

                node 2 = 12 entries

             

            I use keySet() to get the sizes, but I assume, that this is okay, because I only want the entries on the node itself!?

             

            1) I do not understand why there are "double" entries on the two nodes, now. All 4 entries from node 1 are on node 2, too.

             

            2) What is the dedicated way to get values from a shared data store balanced to all nodes?

             

            2) What is the dedicated way to iterate all entries on one node for the distributed execution? In the moment I use values()/keySet().

             

            Thanks

            • 3. Re: balanced state with data from stores
              galder.zamarreno

              Do you have L1 enabled? Could you attach your config?

              • 4. Re: balanced state with data from stores
                marque

                L1 cache is disabled. I got the following xml with getConfigurationAsXmlString()

                 

                <tns:namedCacheConfiguration name="TaskData" xmlns:tns="urn:infinispan:config:5.1" xmlns:xs="http://www.w3.org/2001/XMLSchema">

                    <tns:locking concurrencyLevel="32" isolationLevel="READ_COMMITTED" lockAcquisitionTimeout="30000" useLockStriping="false" writeSkewCheck="false"/>

                    <tns:loaders passivation="false" preload="true" shared="true">

                        <tns:loader fetchPersistentState="false" ignoreModifications="false" purgeOnStartup="false" purgeSynchronously="false" purgerThreads="1" class="org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore">

                            <tns:async enabled="false" flushLockTimeout="1" modificationQueueSize="1024" shutdownTimeout="25000" threadPoolSize="1"/>

                            <tns:singletonStore pushStateTimeout="10000" pushStateWhenCoordinator="true" enabled="false"/>

                            <tns:properties>

                                <tns:property name="dataColumnName" value="DATA"/>

                                <tns:property name="connectionUrl" value="jdbc:oracle:thin:@XXX:1521:devtest"/>

                                <tns:property name="idColumnType" value="VARCHAR(2000)"/>

                                <tns:property name="connectionFactoryClass" value="org.infinispan.loaders.jdbc.connectionfactory.PooledConnectionFactory"/>

                                <tns:property name="timestampColumnType" value="NUMERIC"/>

                                <tns:property name="key2StringMapperClass" value="com.inubit.ibis.server.cache.ispn.ISPNKey2StringMapper"/>

                                <tns:property name="idColumnName" value="ID"/>

                                <tns:property name="driverClass" value="oracle.jdbc.OracleDriver"/>

                                <tns:property name="dataColumnType" value="BLOB"/>

                                <tns:property name="stringsTableNamePrefix" value="ISPN"/>

                                <tns:property name="userName" value="mp"/>

                                <tns:property name="password" value="hdgte12"/>

                                <tns:property name="timestampColumnName" value="TIMESTAMP"/>

                                <tns:property name="dropTableOnExit" value="false"/>

                                <tns:property name="createTableOnStart" value="true"/>

                            </tns:properties>

                        </tns:loader>

                    </tns:loaders>

                    <tns:transaction autoCommit="true" cacheStopTimeout="30000" eagerLockSingleNode="false" lockingMode="PESSIMISTIC" syncCommitPhase="true" syncRollbackPhase="true" transactionMode="NON_TRANSACTIONAL" use1PcForAutoCommitTransactions="false" useEagerLocking="false" useSynchronization="false">

                        <tns:recovery enabled="false" recoveryInfoCacheName="__recoveryInfoCacheName__"/>

                    </tns:transaction>

                    <tns:customInterceptors/>

                    <tns:dataContainer class="org.infinispan.container.DefaultDataContainer">

                        <tns:properties/>

                    </tns:dataContainer>

                    <tns:eviction maxEntries="-1" strategy="NONE" threadPolicy="DEFAULT" wakeUpInterval="-9223372036854775808"/>

                    <tns:expiration reaperEnabled="true" lifespan="-1" maxIdle="-1" wakeUpInterval="15000"/>

                    <tns:unsafe unreliableReturnValues="false"/>

                    <tns:clustering>

                        <tns:sync replTimeout="60000"/>

                        <tns:stateRetrieval alwaysProvideInMemoryState="false" fetchInMemoryState="true" initialRetryWaitTime="500" logFlushTimeout="60000" maxNonProgressingLogWrites="100" numRetries="5" retryWaitTimeIncreaseFactor="2" timeout="180000">

                            <tns:chunkSize>10000</tns:chunkSize>

                        </tns:stateRetrieval>

                        <tns:l1 enabled="false" invalidationThreshold="0" lifespan="600000" onRehash="false"/>

                        <tns:async asyncMarshalling="false" replQueueClass="org.infinispan.remoting.ReplicationQueueImpl" replQueueInterval="5000" replQueueMaxElements="1000" useReplQueue="false"/>

                        <tns:hash class="org.infinispan.distribution.ch.DefaultConsistentHash" hashFunctionClass="org.infinispan.commons.hash.MurmurHash3" numOwners="1" numVirtualNodes="48" rehashEnabled="true" rehashRpcTimeout="180000" rehashWait="180000">

                            <tns:groups enabled="false"/>

                        </tns:hash>

                    </tns:clustering>

                    <tns:jmxStatistics enabled="false"/>

                    <tns:storeAsBinary storeKeysAsBinary="true" storeValuesAsBinary="true" enabled="false"/>

                    <tns:lazyDeserialization enabled="false"/>

                    <tns:deadlockDetection enabled="true" spinDuration="20000"/>

                    <tns:indexing enabled="false" indexLocalOnly="false">

                        <tns:properties/>

                    </tns:indexing>

                    <tns:versioning enabled="false" versioningScheme="NONE"/>

                    <tns:invocationBatching enabled="false"/>

                </tns:namedCacheConfiguration>

                • 5. Re: balanced state with data from stores
                  galder.zamarreno

                  Hmmmm, just in case, can you attach the original XML config file?

                   

                  If you're configuring the cache programmatically, please show the cache configuration code used.

                   

                  values()/keySet() only returns what's available locally. To iterate through all nodes in all nodes in the cluster with DIST, use map/reduce

                  • 6. Re: balanced state with data from stores
                    marque

                    I want to use the distribution framework to iterate the values on each node.

                     

                    But the problem remains, that I could not do it with stored data. If I do not use "preload" I could not iterate. If i use "preload" there to much keys in the keyset (doubled keys on the nodes of my distributed cache). Here is my test configuration and a small test. First I add the three values. Then I run the testRead() a few times. After this there are on each node some keys. Sometimes 3, sometimes 1 or 2. I would expect, that it is full distributed, so there is no key doubled!?

                     

                    <infinispan

                      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                      xsi:schemaLocation="urn:infinispan:config:5.1 http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"

                      xmlns="urn:infinispan:config:5.1">

                      <global>

                        <transport/>

                      </global>

                      <namedCache name="CacheStore">

                        <loaders passivation="false" shared="true" preload="true">

                          <loader class="org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore"

                                  fetchPersistentState="true" ignoreModifications="false" purgeOnStartup="false">

                            <properties>

                              <property name="databaseType" value="h2"/>

                              <property name="stringsTableNamePrefix" value="ISPN"/>

                              <property name="idColumnName" value="ID_COLUMN"/>

                              <property name="dataColumnName" value="DATA_COLUMN"/>

                              <property name="timestampColumnName" value="TIMESTAMP_COLUMN"/>

                              <property name="timestampColumnType" value="BIGINT"/>

                              <property name="connectionFactoryClass" value="org.infinispan.loaders.jdbc.connectionfactory.PooledConnectionFactory"/>

                              <property name="connectionUrl" value="jdbc:h2:/tmp/ispn2/caches;AUTO_SERVER=TRUE"/>

                              <property name="userName" value="sa"/>

                              <property name="driverClass" value="org.h2.Driver"/>

                              <property name="idColumnType" value="VARCHAR(2000)"/>

                              <property name="dataColumnType" value="BINARY"/>

                              <property name="dropTableOnExit" value="false"/>

                              <property name="createTableOnStart" value="true"/>

                            </properties>

                          </loader>

                        </loaders>

                        <clustering mode="distribution">

                          <l1 enabled="false"/>

                          <sync/>

                          <hash numOwners="1" numVirtualNodes="1" rehashEnabled="true">

                            <groups enabled="true"/>

                          </hash>

                        </clustering>

                      </namedCache>

                    </infinispan>

                     

                    public class InfinispanBackendTest {

                     

                      private static EmbeddedCacheManager cacheManager = null;

                      private static Cache<String, String> cache;

                     

                      static {

                        Logger.getRootLogger().setLevel(Level.DEBUG);

                      }

                     

                      @BeforeClass

                      public static void setUp() throws Exception {

                        cacheManager = new DefaultCacheManager(InfinispanBackend.class.getResourceAsStream("InfinispanConfiguration.xml"));

                        cache = cacheManager.getCache("CacheStore");

                      }

                     

                      @AfterClass

                      public void tearDown() {

                        cache.stop();

                        cacheManager.stop();

                      }

                     

                      @Test

                      public static void testInput() {

                        cache.put("1","one");

                        cache.put("2","two");

                        cache.put("3","three");

                      }

                     

                      @Test

                      public static void testRead() {

                        System.out.println("read all:");

                        for(String key : cache.keySet()) {

                          System.out.println("key: "+key+"  value: "+cache.get(key));

                        }

                        keepAlive();

                      }

                     

                      private static void keepAlive() {

                        for(int i=0; i<Long.MAX_VALUE; i++) {

                          try {

                            Thread.currentThread().sleep(5000);

                          } catch (InterruptedException e) { e.printStackTrace(); }

                          System.out.println("elements in keyset : "+cache.keySet().size());

                        }

                      }

                     

                    }

                    • 7. Re: balanced state with data from stores
                      galder.zamarreno

                      marque, can you open a jira in http://issues.jboss.org/browse/ISPN ? preloading shouldn't lead to an overload of data and should not exceed the num of owners set. please attach your config, test and any other extra info you might have. thx

                      • 8. Re: balanced state with data from stores
                        c_lohmn

                        I've attached a unit test to the corrsponding ticket ISPN-1964. It shows that even though numOwners is set to 1, data ends up on multiple nodes.

                         

                        What I also find somewhat confusing regarding this matter is the documentation of the 'preload' option:

                        Note that preloading is done in a local fashion, so any data loaded is only stored locally in the node. No replication or distribution of the preloaded data happens.

                        (https://docs.jboss.org/author/display/ISPN/CacheLoaders)

                        Does "in a local fashion" here refer to the L1? And in a scenario with L1 being disabled, the question would be at which point the distribution of data is supposed to be done after all.

                        • 9. Re: balanced state with data from stores
                          dan.berindei

                          Thanks for the unit test, Carsten. I'd like to add it to our test suite, could you sign the contributor agreement at https://cla.jboss.org/contributions/index.seam ?

                           

                          The paragraph you quoted is only relevant if you have a non-shared cache store. After a cluster restart, consistent hash segments assigned to each node will not match the segments assigned in the previous run. So it's very likely that each joiner will preload keys that it shouldn't own. Even in replicated mode, the last node to go down is not necessarily the first to restart, so when it joins it might have more (or newer) data then the nodes that are already running.

                           

                          We will not try to push any data from a joining node to the existing members, even when the joiner has keys that the existing members don't have. After all, it could be that a key was deleted on the existing members after the restart, and the joiner is wrong to have it (see ISPN-1586).

                          • 10. Re: balanced state with data from stores
                            c_lohmn

                            I've signed the agreement.

                             

                            And thanks for the clarification, makes sense - no replication or distribution of data happens, just cleanup will be performed (of entries that the node doesn't own).