6 Replies Latest reply on Oct 31, 2013 9:57 AM by tremes

    JdbcBatchUpdate exception of JDBC cache store with H2

    dex80526

      I tried to use the jdbc cachstore with H2 DB in a cluster replication mode (I had other issues with jdbm cache loaser posted earlier).

      My configuration is that:

       

      <clustering mode="replication">
           <stateRetrieval
              timeout="240000"
              fetchInMemoryState="true"
              alwaysProvideInMemoryState="false"
           />

       

           <!--
              Network calls are synchronous.
           -->
           <sync replTimeout="20000"/>

       

        </clustering>
        <loaders
           passivation="false"
           shared="false"
           preload="true">

       

           <loader
              class="org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore"
              fetchPersistentState="true"
              purgeOnStartup="false">
              <properties>
                <property name="stringsTableNamePrefix" value="ISPN_STRING_TABLE"/>
                <property name="idColumnName" value="ID_COLUMN"/>
                <property name="dataColumnName" value="DATA_COLUMN"/>
                <property name="timestampColumnName" value="TIMESTAMP_COLUMN"/>
                <property name="timestampColumnType" value="BIGINT"/>
                <property name="connectionFactoryClass" value="org.infinispan.loaders.jdbc.connectionfactory.PooledConnectionFactory"/>
                <property name="connectionUrl" value="jdbc:h2:file:/var/tmp/h2cachestore;DB_CLOSE_DELAY=-1"/>
                <property name="userName" value="sa"/>
                <property name="driverClass" value="org.h2.Driver"/>
                <property name="idColumnType" value="VARCHAR(255)"/>
                <property name="dataColumnType" value="BINARY"/>
                <property name="dropTableOnExit" value="false"/>
                <property name="createTableOnStart" value="true"/>
       

      </properties>

      </loader>

       

      I noticed that when the scond node starts to join the cluster, it tries to do batch inserts. This causes Jdbc exception: Unique index or primary key violation error, since the same data exists in the local cache store (H2 database table).

       

      The question is why we do the "Insert" for each cach entry for merge the state, instead of just applying the difference.

       

      Here are logs:

      Nov 29, 2011 10:40:47 AM org.infinispan.remoting.rpc.RpcManagerImpl retrieveState

      INFO: ISPN000074: Trying to fetch state from test.dex.com-33892

      Nov 29, 2011 10:40:48 AM org.infinispan.loaders.jdbc.DataManipulationHelper fromStreamSupport

      ERROR: ISPN008003: SQL failure while integrating state into store

      org.h2.jdbc.JdbcBatchUpdateException: Unique index or primary key violation: "PRIMARY_KEY_6 ON PUBLIC.ISPN_STRING_TABLE____DEFAULTCACHE(ID_COLUMN)"; SQL statement:

      INSERT INTO ISPN_STRING_TABLE____defaultcache (DATA_COLUMN, TIMESTAMP_COLUMN, ID_COLUMN) VALUES(?,?,?) [23505-158]

              at org.h2.jdbc.JdbcPreparedStatement.executeBatch(JdbcPreparedStatement.java:1107)

              at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeBatch(NewProxyPreparedStatement.java:1723)

              at org.infinispan.loaders.jdbc.DataManipulationHelper.fromStreamSupport(DataManipulationHelper.java:109)

              at org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore.fromStreamLockSafe(JdbcStringBasedCacheStore.java:263)

              at org.infinispan.loaders.LockSupportCacheStore.fromStream(LockSupportCacheStore.java:225)

              at org.infinispan.statetransfer.StateTransferManagerImpl.applyPersistentState(StateTransferManagerImpl.java:335)

              at org.infinispan.statetransfer.StateTransferManagerImpl.applyState(StateTransferManagerImpl.java:279)

              at org.infinispan.remoting.InboundInvocationHandlerImpl.applyState(InboundInvocationHandlerImpl.java:235)

              at org.infinispan.remoting.transport.jgroups.JGroupsTransport.setState(JGroupsTransport.java:607)

              at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.handleUpEvent(MessageDispatcher.java:711)

              at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:771)

              at org.jgroups.JChannel.up(JChannel.java:1441)

              at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:1074)

              at org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.connectToStateProvider(STREAMING_STATE_TRANSFER.java:523)

              at org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.handleStateRsp(STREAMING_STATE_TRANSFER.java:462)

              at org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.up(STREAMING_STATE_TRANSFER.java:223)

              at org.jgroups.protocols.FRAG2.up(FRAG2.java:189)

              at org.jgroups.protocols.FlowControl.up(FlowControl.java:418)

              at org.jgroups.protocols.FlowControl.up(FlowControl.java:400)

              at org.jgroups.protocols.pbcast.GMS.up(GMS.java:908)

              at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:246)

              at org.jgroups.protocols.UNICAST.handleDataReceived(UNICAST.java:613)

              at org.jgroups.protocols.UNICAST.up(UNICAST.java:294)

              at org.jgroups.protocols.pbcast.NAKACK.up(NAKACK.java:703)

              at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:133)

              at org.jgroups.protocols.FD.up(FD.java:275)

              at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:275)

              at org.jgroups.protocols.MERGE2.up(MERGE2.java:209)

              at org.jgroups.protocols.Discovery.up(Discovery.java:293)

              at org.jgroups.protocols.TP.passMessageUp(TP.java:1109)

              at org.jgroups.protocols.TP$IncomingPacket.handleMyMessage(TP.java:1665)

              at org.jgroups.protocols.TP$IncomingPacket.run(TP.java:1647)

              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

              at java.lang.Thread.run(Thread.java:722)

       

      Do I miss something in the configuration?

      How should I configure to sync the data in cache (and cache store) when a new node joins cluster?

       

      Note, I can not purge cache store when a node starts, since I am using the cache store as the persistent store.

        • 1. Re: JdbcBatchUpdate exception of JDBC cache store with H2
          dex80526

          After I took a look at the soruce code the cache loader, the code just do batch inserts without checking the local cache (see below):

          >>>

          public final void fromStreamSupport(ObjectInput objectInput) throws CacheLoaderException {

                Connection conn = null;

                PreparedStatement ps = null;

                try {

                   conn = connectionFactory.getConnection();

                   String sql = tableManipulation.getInsertRowSql();

                   ps = conn.prepareStatement(sql);

           

                   int readCount = 0;

                   int batchSize = tableManipulation.getBatchSize();

           

                   Object objFromStream = marshaller.objectFromObjectStream(objectInput);

                   while (fromStreamProcess(objFromStream, ps, objectInput)) {

                      ps.addBatch();

                      readCount++;

                      if (readCount % batchSize == 0) {

                         ps.executeBatch();

                         if (log.isTraceEnabled()) {

                            log.tracef("Executing batch %s, batch size is %d", readCount / batchSize, batchSize);

                         }

                      }

                      objFromStream = marshaller.objectFromObjectStream(objectInput);

                   }

                   if (readCount % batchSize != 0) {

                      ps.executeBatch();//flush the batch

                   }

                   if (log.isTraceEnabled()) {

                      log.tracef("Successfully inserted %d buckets into the database, batch size is %d", readCount, batchSize);

                   }

                } catch (IOException ex) {...}

           

          I think it (JdbcStringBasedCache.java) should do somthng similar to the method "storeLockSafe()" does.

           

          Do I miss read this? Or should I open a JIRA issue against this?

          • 2. Re: JdbcBatchUpdate exception of JDBC cache store with H2
            galder.zamarreno

            Hmmmm, is both cluster nodes using the same H2 database? Or does each node use a different one? If they both use the same, you need to indicate shared="true" in loaders element:

             

            <loaders passivation="false" shared="true"...

             

            If that doesn't work, our JDBC cache store uses H2 for unit testing: https://github.com/infinispan/infinispan/blob/master/cachestore/jdbc/pom.xml

             

            Maybe you can you please extend it and add a test case for your use case?

            • 3. Re: JdbcBatchUpdate exception of JDBC cache store with H2
              dex80526

              No, each node has its own local cache store (H2 DB). I.e., the loader is no shared.

               

              Please see my configuration above.

               

              I think the code as I posted early does not handle my case, since it tries to do the inserts.

               

              Basically, the code does not drop the local one before do batch inserts.

               

              The current implementation will also end up data out of sync among the nodes as in my another post if a cacahe entry was deleted on the first node when the second node was down, and joins again. The deleted entry will be still exist on the second node.

               

              I think the state merging code should be able to determine the difference between the local data and the state from the coordinator, then apply the changes to the local one.  The differenece includes the add/modification/delete occured on the coordinator during the node is down.

               

              Any idea on how to track the changes?  I am thinking somthing like transaction logs.

              • 4. Re: JdbcBatchUpdate exception of JDBC cache store with H2
                dex80526

                some update:

                 

                I rerun my test with 5.1 Sanpshot code and did not get the above exception.  But, I still have the inconsistent data among the cluster nodes.

                 

                Basically, current ISPN implementation in state transfer will result in data insistence among nodes in replication mode and each node has local cache store.

                 

                I found code BaseStateTransferManagerImpl's applyState code does not remove stale data in the local cache store and result in inconsistent data when joins a cluster:

                Here is the code snipt of applyState():

                  public void applyState(Collection<InternalCacheEntry> state,

                                          Address sender, int viewId) throws InterruptedException {

                   .....

                 

                      for (InternalCacheEntry e : state) {

                         InvocationContext ctx = icc.createInvocationContext(false, 1);

                         // locking not necessary as during rehashing we block all transactions

                         ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP, SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,

                                      SKIP_OWNERSHIP_CHECK);

                         try {

                            PutKeyValueCommand put = cf.buildPutKeyValueCommand(e.getKey(), e.getValue(), e.getLifespan(), e.getMaxIdle(), ctx.getFlags());

                            interceptorChain.invoke(ctx, put);

                         } catch (Exception ee) {

                            log.problemApplyingStateForKey(ee.getMessage(), e.getKey());

                         }

                      }

                 

                ...

                }

                 

                As we can see that the code bascically try to add all data entryies got from the cluster (other node). Hence, it does not know any previous entries were deleted from the cluster which exist in its local cache store. This is exactly my test case (my confiuration is that each node has its own cache store and in replication  mode).

                 

                To fix this, we need to delete any entries from the local cache/cache store which no longer exist in the new state.

                 

                I modified the above method by adding the following code before put loop, and it fixed the problem in my configuration:

                //Remove entries which no loger exist in the new state from local cache/cache store

                for (InternalCacheEntry ie: dataContainer.entrySet()) {

                         

                          if (!state.contains(ie)) {

                          log.debug("Try to delete local store entry no loger exists in the new state: " + ie.getKey());

                          InvocationContext ctx = icc.createInvocationContext(false, 1);

                          // locking not necessary as during rehashing we block all transactions

                          ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP, SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,

                                       SKIP_OWNERSHIP_CHECK);

                          try {

                              RemoveCommand remove = cf.buildRemoveCommand(ie.getKey(), ie.getValue(), ctx.getFlags());

                              interceptorChain.invoke(ctx, remove);

                              dataContainer.remove(ie.getKey());

                           } catch (Exception ee) {

                              log.error("failed to delete local store entry", ee);

                           } 

                          }

                      }

                ...

                 

                Obvious, the above "fix" is based on assuption/configure that dataContainer will have all local entries, i.e., preload=true, no enviction replication.

                 

                The real fix, I think, we need delegate the syncState(state) local stale data to cache store, where we can check the configurations and do the right thing.

                For example, in the cache store impl, we can calculate the changes based on local data and new state, and apply the changes there.

                 

                I create a bug against this in JIRA.

                • 5. Re: Re: JdbcBatchUpdate exception of JDBC cache store with H2
                  tremes

                  I am still occassionally facing similar or same issue on latest WildFly Beta2-SNAPSHOT (this means Infinispan 6.0.0.CR1 and h2 1.3.173). I have two nodes running in my test, where failover is tested. I use following cache-loader configuration:

                  <cache-container name="testDbPersistence" default-cache="jdbc-cache" module="org.wildfly.clustering.web.infinispan">
                       <transport lock-timeout="60000"/>
                       <replicated-cache name="jdbc-cache" mode="SYNC" batching="true">    
                            <binary-keyed-jdbc-store datasource="java:jboss/datasources/ExampleDS" preload="true" passivation="false" purge="false" shared="true">
                                 <binary-keyed-table prefix="binarybased">
                                      <id-column name="id" type="VARCHAR(255)"/>
                                      <data-column name="datum" type="VARBINARY(10000)"/>
                                      <timestamp-column name="version" type="BIGINT"/>
                                 </binary-keyed-table>
                            </binary-keyed-jdbc-store>
                  </replicated-cache>
                  </cache-container>
                                                                                                                                                   
                  
                  

                   

                  I also use this connection url - "jdbc:h2:file;AUTO_SERVER=true". I need to use file storing, because of server shutdown failover. For stack trace you can see [WFLY-2409] Intermittent failures in SessionClusterDbPersistenceTestCase - JBoss Issue Tracker. I'll appreciate any suggestions.

                  • 6. Re: JdbcBatchUpdate exception of JDBC cache store with H2
                    tremes