Problems with concurrent transactions & isolation levels
spohl Mar 29, 2004 11:34 AMHi there,
first: JBossCache is an impressive piece of technology. I'd like to use it as a replacement for caching with BMP (unfortunately I have to write this on my own: legacy DB) and RO/RW-EJB overhead. Therefore I'd like to use TreeCache in SessionBeans and wrote a first transactional example on my own and encountered some problems. To reduce the complexity I ported this example (the well-known bank-example with tellers and concurrent bookings) into a standalone JUnit Testcase. Perhaps you can add it to your testsuite and dig deeper into the problems that this testcase show.
I use the "normal" (non-AOP) Treecache and only one top-level node with 3 "customers" as keys and Integers ("accounts") as values. Then 2 concurrent threads (tellers) make retrievals of all accounts (a first tx) and bookings from one account to another (a second tx). Every of the 3 accounts start with an amount of 1000 'whatever'. The sum of the accounts always has to be 3000 in the tx-isolevels REPEATABLE-READ and SERIALIZABLE.
With WinNT, Sun JVM 1.4.2_03 and the JBossCache 1.0 release "money" gets lost or arises in concurrent situation in default-iso-level RR. With SERIALIZABLE everything works fine, but only if I don't use the LRU eviction policy. It seems that the evictor doesn't release locks and the threads wait forever...
Here is the code (TxConcurrentBankUnitTestCase.java):
package org.jboss.test.cache.test.local; import junit.framework.Test; import junit.framework.TestCase; import junit.framework.TestSuite; import org.jboss.cache.TreeCache; import org.jboss.cache.lock.IsolationLevel; import org.jboss.cache.transaction.DummyTransactionManager; import org.jboss.cache.CacheException; import org.jboss.cache.PropertyConfigurator; import org.jboss.logging.Logger; import javax.naming.Context; import javax.naming.InitialContext; import javax.transaction.UserTransaction; import java.util.Properties; import java.util.Set; import java.util.Iterator; import java.util.HashMap; /** * * Unit test for local TreeCache with concurrent transactions. * Uses locking and multiple threads to test concurrent r/w access to the tree. * * @author <a href="mailto:spohl@users.sourceforge.net">Stefan Pohl</a> * @version $Revision: 1.0 $ * */ public class TxConcurrentBankUnitTestCase extends TestCase { TreeCache cache; private Logger logger_ = Logger.getLogger(TxConcurrentBankUnitTestCase.class); static Properties p = null; String old_factory = null; final String FACTORY = "org.jboss.cache.transaction.DummyContextFactory"; final String NODE = "/cachetest"; final int ROLLBACK_CHANCE = 100; static String customer[] = { "cu1", "cu2", "cu3" }; static final int BOOKINGS = 1000; static boolean _testFailedinThread = false; public TxConcurrentBankUnitTestCase(String name) { super(name); } public void failMain() { _testFailedinThread=true; } public void setUp() throws Exception { super.setUp(); old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY); System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY); DummyTransactionManager.getInstance(); if (p == null) { p = new Properties(); p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.cache.transaction.DummyContextFactory"); } cache = new TreeCache(); cache.setCacheMode(TreeCache.LOCAL); cache.setTransactionManagerLookupClass("org.jboss.cache.JBossTransactionManagerLookup"); // cache.setIsolationLevel(IsolationLevel.REPEATABLE_READ); // SERIALIZABLE works, but only without LRUEvictionPolicy. Otherwise sometimes the threads wait forever on locks probably issued by the LRUPolicy!? cache.setIsolationLevel(IsolationLevel.SERIALIZABLE); /* cache.setEvictionPolicyClass("org.jboss.cache.eviction.LRUPolicy"); PropertyConfigurator config = new PropertyConfigurator(); config.configure(cache, "local-serializable-lrueviction.xml"); */ cache.setLockAcquisitionTimeout(500); cache.start(); } public void tearDown() throws Exception { super.tearDown(); cache.stop(); // BW. kind of a hack to destroy jndi binding and thread local tx before next run. DummyTransactionManager.destroy(); if (old_factory != null) { System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory); old_factory = null; } } public void testConcurrentBooking() { Teller one, two; try { if(cache.get(NODE)==null) { cache.put(NODE, "cu1", new Integer(1000)); cache.put(NODE, "cu2", new Integer(1000)); cache.put(NODE, "cu3", new Integer(1000)); } one = new Teller("one", cache); two = new Teller("two", cache); one.start(); sleep(100); two.start(); one.join(); two.join(); log("lock info:\n" + cache.printLockInfo()+_testFailedinThread); if(_testFailedinThread) fail(); } catch (Exception e) { e.printStackTrace(); fail(e.toString()); } finally { try { cache.remove(NODE); } catch (Exception e) { e.printStackTrace(); fail(); } } } void sleep(long timeout) { try { Thread.sleep(timeout); } catch (InterruptedException e) { } } static void log(String msg) { System.out.println("-- [" + Thread.currentThread() + "]: " + msg); } public static Test suite() { return new TestSuite(TxConcurrentBankUnitTestCase.class); } public static void main(String[] args) { junit.textui.TestRunner.run(suite()); } class Teller extends Thread { TreeCache cache; String threadName_; public Teller(String str, TreeCache cache) { super(str); this.cache = cache; threadName_ = str; } public void run() { int count = customer.length; UserTransaction tx = null; try { tx = (UserTransaction) new InitialContext(p).lookup("UserTransaction"); boolean again = false; int src = 0; int dst = 0; int amo = 0; int anz =0; while(anz<BOOKINGS) { if(!again) { src = (int) (Math.random()*count); dst = (int) (Math.random()*(count-1)); amo =1+ (int) (Math.random()*20); if(dst>=src) dst++; } tx.begin(); HashMap accounts = getAccounts(); tx.commit(); int sum = sumAccounts(accounts); System.out.println(anz+": "+accounts+" Summe: "+sum); // the sum of all accounts always has to be 3000 if(sum!=3000) failMain(); assertEquals(3000, sum); try { tx.begin(); deposit(customer[src], customer[dst], amo, tx); tx.commit(); again = false; } catch(Throwable e) { tx.rollback(); e.printStackTrace(); again = true; } anz++; yield(); } } catch (Throwable t) { t.printStackTrace(); fail(t.toString()); } } /** * Posting */ public void deposit(String from, String to, int amount, UserTransaction tx) throws Exception { log("deposit("+from+", "+to+", "+amount+") called."); try { int act; // debit act = ((Integer) cache.get(NODE, from)).intValue(); cache.put(NODE, from, new Integer(act-amount)); log("deposit("+from+", "+to+", "+amount+") debited."); // eventually rollback the transaction if((int) (Math.random()*ROLLBACK_CHANCE) == 0) { log("!!!manually set rollback ("+from+", "+to+", "+amount+")."); tx.setRollbackOnly(); throw new Exception("Manually set rollback!"); } // credit act = ((Integer) cache.get(NODE, to)).intValue(); cache.put(NODE, to, new Integer(act+amount)); } catch(Exception e) { e.printStackTrace(); fail(e.toString()); } finally { // close resources aquired safely } log("deposit("+from+", "+to+", "+amount+") finished."); } /** * retrieving amounts of accounts */ public HashMap getAccounts() throws CacheException { log("getAccounts() called."); HashMap result = new HashMap(); try { Set set = cache.getKeys(NODE); Iterator iter = set.iterator(); while(iter.hasNext()) { String name = (String) iter.next(); result.put(name, cache.get(NODE, name)); } return result; } catch(CacheException ce) { throw ce; } } protected int sumAccounts(HashMap map) { Iterator iter = map.values().iterator(); int result = 0; while(iter.hasNext()) { result += ((Integer) iter.next()).intValue(); } return result; } } }
At last a comment towards JBossCache as EntityBean-Cache-Backend: If you get into detail with the LRU evictor perhaps you could add a max-lifetime-configuration option for regions to be able to emulate commit-option-d?
I hope my little testcase contribution helps. If the test succeeds please try again ;-) or let the tellers book more than 1000 times.
Or am I completely wrong and missed a big point?
Kind regards,
Stefan Pohl