9 Replies Latest reply on Mar 14, 2008 9:18 AM by Manik Surtani

    newbie question - ReplicationException

    Jim Slattery Newbie

      I constructed a test where two JVMs join the same PojoCache cluster.

      Each JVM starts one transaction. Here are the two timelines matched up:

       JVM-1 JVM-2
      
       --Begin Tx1---
       Modify cachedObj1
       --Begin Tx2---
       (sleeps...) Modify cachedObj1
       --Commit Tx2---
       Modify cachedObj1
       --Commit Tx1---
      


      Basically, two overlapping transactions try to modify the same cached object.

      I was surprised to find that both transactions fail during commit()--both throwing a ReplicationException() with a nested lock.TimeoutException().

      Am I doing something wrong? Shouldn't at least one transaction succeed, and then the other can try again? Otherwise, it seems like if they both have to try again, and forever collide.

      Thanks in advance for your feedback.

        • 1. Re: newbie question - ReplicationException
          Adrian Sandor Newbie

          Yes I found that this can happen. My solution is to have each side time out quickly, wait a random (and short) amount of time, and try again.
          I don't know if there is a "proper" solution.

          Adrian

          • 2. Re: newbie question - ReplicationException
            Manik Surtani Master

            This is something that can happen with pessimistic locking, and part of the motivation to develop an optimistic locking solution.

            I'd recommend trying optimistic locking.

            • 3. Re: newbie question - ReplicationException
              Adrian Sandor Newbie

              Hi, I decided to come back to this problem and find a better solution than the random sleeps and retries.
              We tried using optimistic locking, but it doesn't seem to help at all.
              Here's a test program, using 2 replicated cache instances and writing to the same node:

              package CacheRepl;
              
              import java.util.concurrent.atomic.AtomicInteger;
              
              import javax.transaction.UserTransaction;
              
              import org.apache.log4j.Logger;
              import org.jboss.cache.Cache;
              import org.jboss.cache.DefaultCacheFactory;
              import org.jboss.cache.Fqn;
              import org.jboss.cache.transaction.DummyTransactionManager;
              import org.jboss.cache.transaction.DummyUserTransaction;
              
              public class CacheThread extends Thread {
               private static final Logger LOG = Logger.getLogger("test");
               private static final int THREADS = 2;
               private static final int REPEAT = 20;
               private static final int SLEEP_TIME = 10;
              
               protected int threadId;
               protected final Cache<Object, Object> cache;
              
               public CacheThread(final int threadId) {
               this.threadId = threadId;
               cache = DefaultCacheFactory.getInstance().createCache("replSync-service.xml", true);
               }
              
               private boolean doTransaction() {
               final UserTransaction tx = new DummyUserTransaction(DummyTransactionManager.getInstance());
               try {
               tx.begin();
               cache.put(new Fqn<Object>("node"), "key", "value" + threadId);
               tx.commit();
               return true;
               } catch (Exception e) {
               LOG.error("transaction failed", e);
               try {
               tx.rollback();
               } catch (Exception e1) {
               LOG.warn("rollback failed", e1);
               }
               return false;
               }
               }
              
               @Override
               public void run() {
               int failed = 0;
               for (int i = 0; i < REPEAT; ++i) {
               try {
               Thread.sleep(SLEEP_TIME);
               } catch (Exception e) {
               e.printStackTrace();
               }
               if (!doTransaction()) {
               failed++;
               }
               }
               System.out.println("Thread" + threadId + ": " + failed + " failures");
               }
              
               private void close() {
               cache.stop();
               }
              
               public static void main(String[] args) throws InterruptedException {
               final CacheThread[] threads = new CacheThread[THREADS];
               for (int t = 0; t < THREADS; t++) {
               threads[t] = new CacheThread(t);
               }
               for (int t = 0; t < THREADS; t++) {
               threads[t].start();
               }
               System.out.println("started");
               for (int t = 0; t < THREADS; t++) {
               threads[t].join();
               }
               for (int t = 0; t < THREADS; t++) {
               threads[t].close();
               }
               }
              }
              


              and the cache configuration:

              <server>
               <mbean code="org.jboss.cache.jmx.CacheJmxWrapper"
               name="jboss.cache:service=TreeCache">
              
               <depends>jboss:service=Naming</depends>
               <depends>jboss:service=TransactionManager</depends>
              
               <attribute name="TransactionManagerLookupClass">org.jboss.cache.transaction.DummyTransactionManagerLookup
               </attribute>
               <!--
               Node locking scheme:
               OPTIMISTIC
               PESSIMISTIC (default)
               -->
               <attribute name="NodeLockingScheme">OPTIMISTIC</attribute>
               <!--
               Isolation level : SERIALIZABLE
               REPEATABLE_READ (default)
               READ_COMMITTED
               READ_UNCOMMITTED
               NONE
               -->
               <attribute name="IsolationLevel">READ_COMMITTED</attribute>
              
               <attribute name="CacheMode">REPL_SYNC</attribute>
              
               <attribute name="ClusterName">Test cache</attribute>
              
               <attribute name="ClusterConfig">
               <config>
               <UDP bind_addr="10.0.0.226"
               mcast_addr="228.10.10.10"
               mcast_port="45588"
               tos="8"
               ucast_recv_buf_size="20000000"
               ucast_send_buf_size="640000"
               mcast_recv_buf_size="25000000"
               mcast_send_buf_size="640000"
               loopback="false"
               discard_incompatible_packets="true"
               max_bundle_size="64000"
               max_bundle_timeout="30"
               use_incoming_packet_handler="true"
               ip_ttl="2"
               enable_bundling="false"
               enable_diagnostics="true"
               use_concurrent_stack="true"
               thread_naming_pattern="pl"
               thread_pool.enabled="true"
               thread_pool.min_threads="1"
               thread_pool.max_threads="25"
               thread_pool.keep_alive_time="30000"
               thread_pool.queue_enabled="true"
               thread_pool.queue_max_size="10"
               thread_pool.rejection_policy="Run"
               oob_thread_pool.enabled="true"
               oob_thread_pool.min_threads="1"
               oob_thread_pool.max_threads="4"
               oob_thread_pool.keep_alive_time="10000"
               oob_thread_pool.queue_enabled="true"
               oob_thread_pool.queue_max_size="10"
               oob_thread_pool.rejection_policy="Run"/>
               <PING timeout="2000" num_initial_members="3"/>
               <MERGE2 max_interval="30000" min_interval="10000"/>
               <FD_SOCK/>
               <FD timeout="10000" max_tries="5" shun="true"/>
               <VERIFY_SUSPECT timeout="1500"/>
               <pbcast.NAKACK max_xmit_size="60000"
               use_mcast_xmit="false" gc_lag="0"
               retransmit_timeout="300,600,1200,2400,4800"
               discard_delivered_msgs="true"/>
               <UNICAST timeout="300,600,1200,2400,3600"/>
               <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
               max_bytes="400000"/>
               <pbcast.GMS print_local_addr="true" join_timeout="5000"
               join_retry_timeout="2000" shun="false"
               view_bundling="true" view_ack_collection_timeout="5000"/>
               <FRAG2 frag_size="60000"/>
               <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/>
               <!-- <pbcast.STATE_TRANSFER/> -->
               <pbcast.FLUSH timeout="0"/>
               </config>
               </attribute>
              
               <attribute name="FetchInMemoryState">true</attribute>
               <attribute name="StateRetrievalTimeout">15000</attribute>
               <attribute name="SyncReplTimeout">15000</attribute>
               <attribute name="LockAcquisitionTimeout">1000</attribute>
              
               <attribute name="UseRegionBasedMarshalling">true</attribute>
               <attribute name="TransactionTimeout">300</attribute>
               </mbean>
              
              </server>
              


              With both pessimistic and optimistic locking, there are failures every time.
              Here are some example exceptions:

              - optimistic locking:
              org.jboss.cache.lock.TimeoutException: failure acquiring lock: fqn=/, caller=GlobalTransaction:<10.0.0.226:32932>:1, lock=write owner=GlobalTransaction:<10.0.0.226:32931>:2 (org.jboss.cache.lock.LockStrategyReadCommitted@1fd6bea)
               at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:528)
               at org.jboss.cache.interceptors.OptimisticLockingInterceptor.lockNodes(OptimisticLockingInterceptor.java:119)
               at org.jboss.cache.interceptors.OptimisticLockingInterceptor.invoke(OptimisticLockingInterceptor.java:52)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.OptimisticReplicationInterceptor.invoke(OptimisticReplicationInterceptor.java:73)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.NotificationInterceptor.invoke(NotificationInterceptor.java:32)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.TxInterceptor.handleOptimisticPrepare(TxInterceptor.java:374)
               at org.jboss.cache.interceptors.TxInterceptor.handleRemotePrepare(TxInterceptor.java:250)
               at org.jboss.cache.interceptors.TxInterceptor.invoke(TxInterceptor.java:100)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.CacheMgmtInterceptor.invoke(CacheMgmtInterceptor.java:123)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.InvocationContextInterceptor.invoke(InvocationContextInterceptor.java:62)
               at org.jboss.cache.CacheImpl.invokeMethod(CacheImpl.java:3939)
               at org.jboss.cache.CacheImpl._replicate(CacheImpl.java:2853)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
               at java.lang.reflect.Method.invoke(Method.java:585)
               at org.jgroups.blocks.MethodCall.invoke(MethodCall.java:330)
               at org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher.handle(InactiveRegionAwareRpcDispatcher.java:77)
               at org.jgroups.blocks.RequestCorrelator.handleRequest(RequestCorrelator.java:624)
               at org.jgroups.blocks.RequestCorrelator.receiveMessage(RequestCorrelator.java:533)
               at org.jgroups.blocks.RequestCorrelator.receive(RequestCorrelator.java:365)
               at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:736)
               at org.jgroups.JChannel.up(JChannel.java:1063)
               at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:325)
               at org.jgroups.protocols.pbcast.FLUSH.up(FLUSH.java:406)
               at org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.up(STREAMING_STATE_TRANSFER.java:255)
               at org.jgroups.protocols.FRAG2.up(FRAG2.java:197)
               at org.jgroups.protocols.pbcast.GMS.up(GMS.java:722)
               at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:234)
               at org.jgroups.protocols.UNICAST.up(UNICAST.java:263)
               at org.jgroups.protocols.pbcast.NAKACK.handleMessage(NAKACK.java:720)
               at org.jgroups.protocols.pbcast.NAKACK.up(NAKACK.java:546)
               at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:167)
               at org.jgroups.protocols.FD.up(FD.java:322)
               at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:298)
               at org.jgroups.protocols.MERGE2.up(MERGE2.java:145)
               at org.jgroups.protocols.Discovery.up(Discovery.java:220)
               at org.jgroups.protocols.TP$IncomingPacket.handleMyMessage(TP.java:1486)
               at org.jgroups.protocols.TP$IncomingPacket.run(TP.java:1440)
               at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
               at java.lang.Thread.run(Thread.java:595)
              Caused by: org.jboss.cache.lock.TimeoutException: write lock for / could not be acquired after 1000 ms. Locks: Read lock owners: []
              Write lock owner: GlobalTransaction:<10.0.0.226:32931>:2
               (caller=GlobalTransaction:<10.0.0.226:32932>:1, lock info: write owner=GlobalTransaction:<10.0.0.226:32931>:2 (org.jboss.cache.lock.LockStrategyReadCommitted@1fd6bea))
               at org.jboss.cache.lock.IdentityLock.acquireWriteLock0(IdentityLock.java:244)
               at org.jboss.cache.lock.IdentityLock.acquireWriteLock(IdentityLock.java:167)
               at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:497)
               ... 46 more
              
              javax.transaction.RollbackException: outcome is false status: 1
               at org.jboss.cache.transaction.DummyTransaction.commit(DummyTransaction.java:75)
               at org.jboss.cache.transaction.DummyBaseTransactionManager.commit(DummyBaseTransactionManager.java:78)
               at org.jboss.cache.transaction.DummyUserTransaction.commit(DummyUserTransaction.java:77)
               at CacheRepl.CacheThread.doTransaction(CacheThread.java:31)
               at CacheRepl.CacheThread.run(CacheThread.java:53)
              


              - pessimistic locking:
              org.jboss.cache.lock.TimeoutException: failure acquiring lock: fqn=/node, caller=GlobalTransaction:<10.0.0.226:32933>:2, lock=write owner=GlobalTransaction:<10.0.0.226:32934>:1 (org.jboss.cache.lock.LockStrategyReadCommitted@1977b9b)
               at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:528)
               at org.jboss.cache.interceptors.PessimisticLockInterceptor$LockManager.acquire(PessimisticLockInterceptor.java:579)
               at org.jboss.cache.interceptors.PessimisticLockInterceptor.acquireNodeLock(PessimisticLockInterceptor.java:393)
               at org.jboss.cache.interceptors.PessimisticLockInterceptor.lock(PessimisticLockInterceptor.java:329)
               at org.jboss.cache.interceptors.PessimisticLockInterceptor.invoke(PessimisticLockInterceptor.java:187)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.ReplicationInterceptor.invoke(ReplicationInterceptor.java:34)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.NotificationInterceptor.invoke(NotificationInterceptor.java:32)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.TxInterceptor.replayModifications(TxInterceptor.java:511)
               at org.jboss.cache.interceptors.TxInterceptor.handlePessimisticPrepare(TxInterceptor.java:394)
               at org.jboss.cache.interceptors.TxInterceptor.handleRemotePrepare(TxInterceptor.java:254)
               at org.jboss.cache.interceptors.TxInterceptor.invoke(TxInterceptor.java:100)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.CacheMgmtInterceptor.invoke(CacheMgmtInterceptor.java:123)
               at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
               at org.jboss.cache.interceptors.InvocationContextInterceptor.invoke(InvocationContextInterceptor.java:62)
               at org.jboss.cache.CacheImpl.invokeMethod(CacheImpl.java:3939)
               at org.jboss.cache.CacheImpl._replicate(CacheImpl.java:2853)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
               at java.lang.reflect.Method.invoke(Method.java:585)
               at org.jgroups.blocks.MethodCall.invoke(MethodCall.java:330)
               at org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher.handle(InactiveRegionAwareRpcDispatcher.java:77)
               at org.jgroups.blocks.RequestCorrelator.handleRequest(RequestCorrelator.java:624)
               at org.jgroups.blocks.RequestCorrelator.receiveMessage(RequestCorrelator.java:533)
               at org.jgroups.blocks.RequestCorrelator.receive(RequestCorrelator.java:365)
               at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:736)
               at org.jgroups.JChannel.up(JChannel.java:1063)
               at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:325)
               at org.jgroups.protocols.pbcast.FLUSH.up(FLUSH.java:406)
               at org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.up(STREAMING_STATE_TRANSFER.java:255)
               at org.jgroups.protocols.FRAG2.up(FRAG2.java:197)
               at org.jgroups.protocols.pbcast.GMS.up(GMS.java:722)
               at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:234)
               at org.jgroups.protocols.UNICAST.up(UNICAST.java:263)
               at org.jgroups.protocols.pbcast.NAKACK.handleMessage(NAKACK.java:720)
               at org.jgroups.protocols.pbcast.NAKACK.up(NAKACK.java:546)
               at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:167)
               at org.jgroups.protocols.FD.up(FD.java:322)
               at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:298)
               at org.jgroups.protocols.MERGE2.up(MERGE2.java:145)
               at org.jgroups.protocols.Discovery.up(Discovery.java:220)
               at org.jgroups.protocols.TP$IncomingPacket.handleMyMessage(TP.java:1486)
               at org.jgroups.protocols.TP$IncomingPacket.run(TP.java:1440)
               at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
               at java.lang.Thread.run(Thread.java:595)
              Caused by: org.jboss.cache.lock.TimeoutException: write lock for /node could not be acquired after 1000 ms. Locks: Read lock owners: []
              Write lock owner: GlobalTransaction:<10.0.0.226:32934>:1
               (caller=GlobalTransaction:<10.0.0.226:32933>:2, lock info: write owner=GlobalTransaction:<10.0.0.226:32934>:1 (org.jboss.cache.lock.LockStrategyReadCommitted@1977b9b))
               at org.jboss.cache.lock.IdentityLock.acquireWriteLock0(IdentityLock.java:244)
               at org.jboss.cache.lock.IdentityLock.acquireWriteLock(IdentityLock.java:167)
               at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:497)
               ... 49 more
              
              javax.transaction.RollbackException: outcome is false status: 1
               at org.jboss.cache.transaction.DummyTransaction.commit(DummyTransaction.java:75)
               at org.jboss.cache.transaction.DummyBaseTransactionManager.commit(DummyBaseTransactionManager.java:78)
               at org.jboss.cache.transaction.DummyUserTransaction.commit(DummyUserTransaction.java:77)
               at CacheRepl.CacheThread.doTransaction(CacheThread.java:31)
               at CacheRepl.CacheThread.run(CacheThread.java:53)
              


              I don't care about the configuration details, but this test program should NEVER, EVER, NOT EVER, fail under any circumstances, except nuclear attack or something like that.
              One cache should always get the lock first, write to the node, finish the transaction, then the other cache can do the same.
              Just like 2 threads setting the same AtomicInteger - that NEVER throws an exception.
              Is there any solution to this?

              Thanks
              Adrian

              • 4. Re: newbie question - ReplicationException
                Manik Surtani Master

                Both locking schemes will have one node fail in the described scenario. The differences are, using the original poster's example, with PL the commit on JVM2 will fail while with OL the commit on JVM1 will fail.

                Other differences are that with OL, the fail on JVM1 is local, i.e., it will fail before any remote calls are made and hence more efficient. also with OL if the tx on JVM1 hangs, other txs on other JVMs (or even the same JVM) can proceed.

                If you want proper atomicity, then you get into the realm of distributed locks (or fail-fast cooperative locks) but either way, we're talking about extremely non-scalable solutions, especially since in a cache, you're looking at a mostly-read use-case which should make such a failure rare and a retry acceptable.

                If you are looking at using the cache to store a heavily updated value, like a counter, for example, then a cache is almost certainly the wrong tool for the job.

                That said, enough folk have asked for distributed locking and it is on the roadmap (albeit low prio - JBCACHE-1098) . It will almost certainly not be enabled by default for scalability reasons though.


                • 5. Re: newbie question - ReplicationException
                  Adrian Sandor Newbie

                  Hm, the forum system ate my reply, I'll try to write it again.
                  In my case, with both locking schemes, both nodes usually fail at the same time (but not always).
                  I don't necessarily want a specific type of locks (e.g. distributed), especially if they're "extremely non-scalable". Indeed, this situation is rare, and if you think a retry is the solution here then I can accept it. However, I would expect the cache to handle it transparently (and in a configurable way); failure is not acceptable.

                  Also, this is not about storing a heavily updated value. Say we have to make some changes in the cache based on some external real-time events (from a data feed), and we have 2 replicated caches, for failover.
                  Both caches should listen to the events and perform the same changes immediately (which basically means at the same time). This ensures that no data can be lost if one cache dies, but it also triggers the problem I found. How can this be handled?

                  Thanks
                  Adrian

                  • 6. Re: newbie question - ReplicationException
                    Manik Surtani Master

                    Not necessarily heavily updated, but more importantly concurrently updated, which is what you described.

                    In your case though, perhaps a good approach may be for both caches to attempt to write the change, and if one cache fails to write, assume that this is because the other has completed the write and hence it would not be necessary?

                    Also, I find it odd that *both nodes* fail. I guess with P/L and identical lock acquisition timeouts bean that this may happen. If your setup is a master/backup, perhaps your backup cache could have a much shorter lock acquisition timeout so it would always fail first, allowing the other cache to continue?

                    Both caches failing should almost certainly not happen with optimistic locking - let me know if it does and we can investigate that.

                    • 7. Re: newbie question - ReplicationException
                      Adrian Sandor Newbie

                       

                      In your case though, perhaps a good approach may be for both caches to attempt to write the change, and if one cache fails to write, assume that this is because the other has completed the write and hence it would not be necessary?


                      That seems like a terrible approach, because there's no guarantee that the other cache succeeded. In fact, as I will show, it's almost certain that it also failed (if they have the same timeout).

                      Both caches failing should almost certainly not happen with optimistic locking - let me know if it does and we can investigate that.


                      Well, with pessimistic locking I can say that if one cache fails than both fail, practically always.
                      With optimistic locking, they don't always fail together, but "just" most of the time. Also, the percentage of failures seems to be greater.

                      Here's the modified code:

                      package CacheRepl;
                      
                      import java.text.DateFormat;
                      import java.text.SimpleDateFormat;
                      import java.util.Date;
                      import java.util.Timer;
                      import java.util.TimerTask;
                      
                      import javax.transaction.UserTransaction;
                      
                      import org.apache.log4j.Logger;
                      import org.jboss.cache.Cache;
                      import org.jboss.cache.DefaultCacheFactory;
                      import org.jboss.cache.Fqn;
                      import org.jboss.cache.transaction.DummyTransactionManager;
                      import org.jboss.cache.transaction.DummyUserTransaction;
                      
                      public class CacheThread2 {
                       private static final Logger LOG = Logger.getLogger("test");
                       private static final int THREADS = 2;
                       private static final int REPEAT = 20;
                       protected static final int INTERVAL = 1000;
                      
                       protected int count = 0;
                       protected final Timer timer = new Timer();
                       protected final int threadId;
                       protected final Cache<Object, Object> cache;
                       protected final DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
                       protected volatile boolean done = false;
                       protected final boolean[] results = new boolean[REPEAT];
                      
                       protected class CacheTask extends TimerTask {
                       @Override
                       public void run() {
                       final String t = df.format(new Date());
                       final boolean result = doTransaction();
                       System.out.println(t + " step " + count + " T" + threadId
                       + (result ? " succeeded" : " failed"));
                       results[count++] = result;
                       if (count >= REPEAT) {
                       timer.cancel();
                       done = true;
                       }
                       }
                       }
                      
                       public CacheThread2(final int threadId) {
                       this.threadId = threadId;
                       cache = DefaultCacheFactory.getInstance().createCache("replSync-service.xml", true);
                       }
                      
                       public void start(final Date time) {
                       timer.scheduleAtFixedRate(new CacheTask(), time, INTERVAL);
                       }
                      
                       protected boolean doTransaction() {
                       final UserTransaction tx = new DummyUserTransaction(DummyTransactionManager.getInstance());
                       try {
                       tx.begin();
                       cache.put(new Fqn<Object>("node"), "key", "value" + threadId);
                       tx.commit();
                       return true;
                       } catch (Exception e) {
                       LOG.error("transaction failed", e);
                       try {
                       tx.rollback();
                       } catch (Exception e1) {
                       LOG.warn("rollback failed", e1);
                       }
                       return false;
                       }
                       }
                      
                       public static void main(String[] args) throws InterruptedException {
                       final CacheThread2[] threads = new CacheThread2[THREADS];
                       for (int t = 0; t < THREADS; t++) {
                       threads[t] = new CacheThread2(t);
                       }
                       Date time = new Date(System.currentTimeMillis() + 2000);
                       for (int t = 0; t < THREADS; t++) {
                       threads[t].start(time);
                       }
                       while (!threads[0].done || !threads[1].done) {
                       Thread.sleep(500);
                       }
                       final int[] stats = new int[4];
                       for (int j = 0; j < REPEAT; ++j) {
                       stats[(threads[0].results[j] ? 2 : 0) + (threads[1].results[j] ? 1 : 0)]++;
                       }
                       System.out.println("\nBoth failed: " + stats[0] + " times");
                       System.out.println("First one failed: " + stats[1] + " times");
                       System.out.println("Second one failed: " + stats[2] + " times");
                       System.out.println("Both succeeded: " + stats[3] + " times");
                       }
                      }
                      


                      and cache configuration:

                      <server>
                       <mbean code="org.jboss.cache.jmx.CacheJmxWrapper"
                       name="jboss.cache:service=TreeCache">
                      
                       <depends>jboss:service=Naming</depends>
                       <depends>jboss:service=TransactionManager</depends>
                      
                       <attribute name="TransactionManagerLookupClass">org.jboss.cache.transaction.DummyTransactionManagerLookup
                       </attribute>
                       <!--
                       Node locking scheme:
                       OPTIMISTIC
                       PESSIMISTIC (default)
                       -->
                       <attribute name="NodeLockingScheme">OPTIMISTIC</attribute>
                       <!--
                       Isolation level : SERIALIZABLE
                       REPEATABLE_READ (default)
                       READ_COMMITTED
                       READ_UNCOMMITTED
                       NONE
                       -->
                       <attribute name="IsolationLevel">READ_COMMITTED</attribute>
                      
                       <attribute name="CacheMode">REPL_SYNC</attribute>
                      
                       <attribute name="ClusterName">Test cache</attribute>
                      
                       <attribute name="ClusterConfig">
                       <config>
                       <UDP bind_addr="10.0.0.226"
                       mcast_addr="228.10.10.10"
                       mcast_port="45588"
                       tos="8"
                       ucast_recv_buf_size="20000000"
                       ucast_send_buf_size="640000"
                       mcast_recv_buf_size="25000000"
                       mcast_send_buf_size="640000"
                       loopback="false"
                       discard_incompatible_packets="true"
                       max_bundle_size="64000"
                       max_bundle_timeout="30"
                       use_incoming_packet_handler="true"
                       ip_ttl="2"
                       enable_bundling="false"
                       enable_diagnostics="true"
                       use_concurrent_stack="true"
                       thread_naming_pattern="pl"
                       thread_pool.enabled="true"
                       thread_pool.min_threads="1"
                       thread_pool.max_threads="25"
                       thread_pool.keep_alive_time="30000"
                       thread_pool.queue_enabled="true"
                       thread_pool.queue_max_size="10"
                       thread_pool.rejection_policy="Run"
                       oob_thread_pool.enabled="true"
                       oob_thread_pool.min_threads="1"
                       oob_thread_pool.max_threads="4"
                       oob_thread_pool.keep_alive_time="10000"
                       oob_thread_pool.queue_enabled="true"
                       oob_thread_pool.queue_max_size="10"
                       oob_thread_pool.rejection_policy="Run"/>
                       <PING timeout="2000" num_initial_members="3"/>
                       <MERGE2 max_interval="30000" min_interval="10000"/>
                       <FD_SOCK/>
                       <FD timeout="10000" max_tries="5" shun="true"/>
                       <VERIFY_SUSPECT timeout="1500"/>
                       <pbcast.NAKACK max_xmit_size="60000"
                       use_mcast_xmit="false" gc_lag="0"
                       retransmit_timeout="300,600,1200,2400,4800"
                       discard_delivered_msgs="true"/>
                       <UNICAST timeout="300,600,1200,2400,3600"/>
                       <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                       max_bytes="400000"/>
                       <pbcast.GMS print_local_addr="true" join_timeout="5000"
                       join_retry_timeout="2000" shun="false"
                       view_bundling="true" view_ack_collection_timeout="5000"/>
                       <FRAG2 frag_size="60000"/>
                       <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/>
                       <!-- <pbcast.STATE_TRANSFER/> -->
                       <pbcast.FLUSH timeout="0"/>
                       </config>
                       </attribute>
                      
                       <attribute name="FetchInMemoryState">true</attribute>
                       <attribute name="StateRetrievalTimeout">15000</attribute>
                       <attribute name="SyncReplTimeout">15000</attribute>
                       <attribute name="LockAcquisitionTimeout">500</attribute>
                      
                       <attribute name="UseRegionBasedMarshalling">true</attribute>
                       <attribute name="TransactionTimeout">300</attribute>
                       </mbean>
                      
                      </server>
                      


                      Note that LockAcquisitionTimeout should be smaller than INTERVAL in the code.

                      Here's an example result with PESSIMISTIC locking:

                      Both failed: 15 times
                      First one failed: 0 times
                      Second one failed: 0 times
                      Both succeeded: 5 times
                      


                      and with OPTIMISTIC locking:

                      Both failed: 15 times
                      First one failed: 2 times
                      Second one failed: 1 times
                      Both succeeded: 2 times
                      


                      What do you think?

                      Thanks
                      Adrian

                      • 9. Re: newbie question - ReplicationException
                        Manik Surtani Master

                        Related to what you have been saying - http://www.jboss.com/index.html?module=bb&op=viewtopic&t=131474

                        But yes, specific to your problem, I have created a JIRA task: JBCACHE-1307