7 Replies Latest reply on May 3, 2004 7:02 AM by Ben Wang

    Problems with concurrent transactions & isolation levels

    spohl Newbie

      Hi there,

      first: JBossCache is an impressive piece of technology. I'd like to use it as a replacement for caching with BMP (unfortunately I have to write this on my own: legacy DB) and RO/RW-EJB overhead. Therefore I'd like to use TreeCache in SessionBeans and wrote a first transactional example on my own and encountered some problems. To reduce the complexity I ported this example (the well-known bank-example with tellers and concurrent bookings) into a standalone JUnit Testcase. Perhaps you can add it to your testsuite and dig deeper into the problems that this testcase show.

      I use the "normal" (non-AOP) Treecache and only one top-level node with 3 "customers" as keys and Integers ("accounts") as values. Then 2 concurrent threads (tellers) make retrievals of all accounts (a first tx) and bookings from one account to another (a second tx). Every of the 3 accounts start with an amount of 1000 'whatever'. The sum of the accounts always has to be 3000 in the tx-isolevels REPEATABLE-READ and SERIALIZABLE.

      With WinNT, Sun JVM 1.4.2_03 and the JBossCache 1.0 release "money" gets lost or arises in concurrent situation in default-iso-level RR. With SERIALIZABLE everything works fine, but only if I don't use the LRU eviction policy. It seems that the evictor doesn't release locks and the threads wait forever...

      Here is the code (TxConcurrentBankUnitTestCase.java):

      package org.jboss.test.cache.test.local;
      
      import junit.framework.Test;
      import junit.framework.TestCase;
      import junit.framework.TestSuite;
      import org.jboss.cache.TreeCache;
      import org.jboss.cache.lock.IsolationLevel;
      import org.jboss.cache.transaction.DummyTransactionManager;
      import org.jboss.cache.CacheException;
      import org.jboss.cache.PropertyConfigurator;
      import org.jboss.logging.Logger;
      
      import javax.naming.Context;
      import javax.naming.InitialContext;
      import javax.transaction.UserTransaction;
      import java.util.Properties;
      import java.util.Set;
      import java.util.Iterator;
      import java.util.HashMap;
      
      /**
       *
       * Unit test for local TreeCache with concurrent transactions.
       * Uses locking and multiple threads to test concurrent r/w access to the tree.
       *
       * @author <a href="mailto:spohl@users.sourceforge.net">Stefan Pohl</a>
       * @version $Revision: 1.0 $
       *
       */
      public class TxConcurrentBankUnitTestCase extends TestCase {
      
       TreeCache cache;
       private Logger logger_ = Logger.getLogger(TxConcurrentBankUnitTestCase.class);
       static Properties p = null;
       String old_factory = null;
       final String FACTORY = "org.jboss.cache.transaction.DummyContextFactory";
       final String NODE = "/cachetest";
       final int ROLLBACK_CHANCE = 100;
      
       static String customer[] = { "cu1", "cu2", "cu3" };
       static final int BOOKINGS = 1000;
       static boolean _testFailedinThread = false;
      
       public TxConcurrentBankUnitTestCase(String name)
       {
       super(name);
       }
      
       public void failMain() {
       _testFailedinThread=true;
       }
      
       public void setUp() throws Exception
       {
       super.setUp();
       old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
       System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
       DummyTransactionManager.getInstance();
       if (p == null) {
       p = new Properties();
       p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.cache.transaction.DummyContextFactory");
       }
      
       cache = new TreeCache();
       cache.setCacheMode(TreeCache.LOCAL);
       cache.setTransactionManagerLookupClass("org.jboss.cache.JBossTransactionManagerLookup");
      // cache.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
       // SERIALIZABLE works, but only without LRUEvictionPolicy. Otherwise sometimes the threads wait forever on locks probably issued by the LRUPolicy!?
       cache.setIsolationLevel(IsolationLevel.SERIALIZABLE);
      /*
       cache.setEvictionPolicyClass("org.jboss.cache.eviction.LRUPolicy");
       PropertyConfigurator config = new PropertyConfigurator();
       config.configure(cache, "local-serializable-lrueviction.xml");
      */
       cache.setLockAcquisitionTimeout(500);
       cache.start();
       }
      
       public void tearDown() throws Exception
       {
       super.tearDown();
       cache.stop();
       // BW. kind of a hack to destroy jndi binding and thread local tx before next run.
       DummyTransactionManager.destroy();
       if (old_factory != null) {
       System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
       old_factory = null;
       }
       }
      
       public void testConcurrentBooking()
       {
       Teller one, two;
       try {
       if(cache.get(NODE)==null) {
       cache.put(NODE, "cu1", new Integer(1000));
       cache.put(NODE, "cu2", new Integer(1000));
       cache.put(NODE, "cu3", new Integer(1000));
       }
      
       one = new Teller("one", cache);
       two = new Teller("two", cache);
      
       one.start();
       sleep(100);
       two.start();
       one.join();
       two.join();
      
       log("lock info:\n" + cache.printLockInfo()+_testFailedinThread);
       if(_testFailedinThread) fail();
       } catch (Exception e) {
       e.printStackTrace();
       fail(e.toString());
       } finally {
       try {
       cache.remove(NODE);
       } catch (Exception e) {
       e.printStackTrace();
       fail();
       }
       }
       }
      
       void sleep(long timeout)
       {
       try {
       Thread.sleep(timeout);
       } catch (InterruptedException e) {
       }
       }
      
       static void log(String msg)
       {
       System.out.println("-- [" + Thread.currentThread() + "]: " + msg);
       }
      
       public static Test suite()
       {
       return new TestSuite(TxConcurrentBankUnitTestCase.class);
       }
      
       public static void main(String[] args)
       {
       junit.textui.TestRunner.run(suite());
       }
      
       class Teller extends Thread
       {
       TreeCache cache;
       String threadName_;
       public Teller(String str, TreeCache cache)
       {
       super(str);
       this.cache = cache;
       threadName_ = str;
       }
      
       public void run()
       {
       int count = customer.length;
       UserTransaction tx = null;
       try {
       tx = (UserTransaction) new InitialContext(p).lookup("UserTransaction");
      
       boolean again = false;
       int src = 0;
       int dst = 0;
       int amo = 0;
       int anz =0;
       while(anz<BOOKINGS) {
       if(!again) {
       src = (int) (Math.random()*count);
       dst = (int) (Math.random()*(count-1));
       amo =1+ (int) (Math.random()*20);
       if(dst>=src) dst++;
       }
       tx.begin();
       HashMap accounts = getAccounts();
       tx.commit();
       int sum = sumAccounts(accounts);
       System.out.println(anz+": "+accounts+" Summe: "+sum);
       // the sum of all accounts always has to be 3000
       if(sum!=3000) failMain();
       assertEquals(3000, sum);
      
       try {
       tx.begin();
       deposit(customer[src], customer[dst], amo, tx);
       tx.commit();
       again = false;
       } catch(Throwable e) {
       tx.rollback();
       e.printStackTrace();
       again = true;
       }
       anz++;
       yield();
       }
       } catch (Throwable t) {
       t.printStackTrace();
       fail(t.toString());
       }
       }
       /**
       * Posting
       */
       public void deposit(String from, String to, int amount, UserTransaction tx) throws Exception {
       log("deposit("+from+", "+to+", "+amount+") called.");
       try
       {
       int act;
       // debit
       act = ((Integer) cache.get(NODE, from)).intValue();
       cache.put(NODE, from, new Integer(act-amount));
       log("deposit("+from+", "+to+", "+amount+") debited.");
      
       // eventually rollback the transaction
       if((int) (Math.random()*ROLLBACK_CHANCE) == 0) {
       log("!!!manually set rollback ("+from+", "+to+", "+amount+").");
       tx.setRollbackOnly();
       throw new Exception("Manually set rollback!");
       }
      
       // credit
       act = ((Integer) cache.get(NODE, to)).intValue();
       cache.put(NODE, to, new Integer(act+amount));
       } catch(Exception e) {
       e.printStackTrace();
       fail(e.toString());
       } finally {
       // close resources aquired safely
       }
       log("deposit("+from+", "+to+", "+amount+") finished.");
       }
       /**
       * retrieving amounts of accounts
       */
       public HashMap getAccounts() throws CacheException {
       log("getAccounts() called.");
       HashMap result = new HashMap();
       try {
       Set set = cache.getKeys(NODE);
       Iterator iter = set.iterator();
       while(iter.hasNext()) {
       String name = (String) iter.next();
       result.put(name, cache.get(NODE, name));
       }
       return result;
       } catch(CacheException ce) {
       throw ce;
       }
       }
       protected int sumAccounts(HashMap map) {
       Iterator iter = map.values().iterator();
       int result = 0;
       while(iter.hasNext()) {
       result += ((Integer) iter.next()).intValue();
       }
       return result;
       }
       }
      }
      


      At last a comment towards JBossCache as EntityBean-Cache-Backend: If you get into detail with the LRU evictor perhaps you could add a max-lifetime-configuration option for regions to be able to emulate commit-option-d?

      I hope my little testcase contribution helps. If the test succeeds please try again ;-) or let the tellers book more than 1000 times.
      Or am I completely wrong and missed a big point?

      Kind regards,
      Stefan Pohl