Transactions: atomicity OK, isolation KO
chtimi2 Sep 10, 2009 5:29 AMI'm studying the feasibility of using JbossPOJOCache (henceforth JBPC) as a transactional cache for our application framework. I'm using a AtomikosEssentials transaction manager, and Spring as a declarative JTA container. JBPC is 3.0.0GA.
To do so i wrote 2 unit tests, one for testing the A part of ACID and one for testing the I part of ACID.
-The atomicity test executes a failing transaction and verifies it has been rollbacked.
-The isolation test starts a thread that modifies a JBPC object; this update is artifically long (Thread.sleep at the end). It then verifies that a read is blocked until the update is over, and that the read returns the updated state
The atomicity test passes, indicating that indeed i'm working inside a transaction.
But the isolation test fails. The test per se seems correct because if i use a JDBC resource instead of the JBPC object as shared state, the test passes. Thus, there seems to be a problem with my JBPC configuration, or something else i don't understand.
The unit test:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration ( locations={"/application-context_Jdbc_Atomikos.xml"} ) public class ComportementTransactionnelTable implements ApplicationContextAware { @Resource ( name="tracksTable" ) protected TracksTable table; private final ExecutorService executorService = Executors.newFixedThreadPool(1); protected static ApplicationContext applicationContext; @Before public void start () { table.deleteAllTracks (); assertEquals ( 0 , table.size() ); } @Test //@Ignore public void atomicite () throws Exception { //Apres saisie: 1 ligne table.create2Tracks ( false ); assertEquals ( 2 , table.size() ); try { table.create2Tracks ( true ); fail (); } catch ( Exception e ) {} finally { assertEquals ( 2 , table.size() ); } } @Test //@Ignore public void isolation () throws Exception { assertEquals ( 0 , table.size() ); int dureeEnSecondes = 5; //Must be < lock acquire timeout executorService.submit ( new MiseAJourLongue ( dureeEnSecondes ) ); StopWatch stopWatch = new StopWatch (); stopWatch.start(); //Wait a little to be sure that by the time we get to table.size, //the MiseAJourLongue thread has had time to start its transaction ObjectUtils.sleep ( 100 ); assertEquals ( 1 , table.size() ); stopWatch.stop(); double tempsBloque = stopWatch.getTotalTimeSeconds(); System.out.println ( "isolation/tempsBloque: " + tempsBloque ); assertTrue ( tempsBloque>= dureeEnSecondes ); } private class MiseAJourLongue implements Runnable { private final int dureeEnSecondes; MiseAJourLongue ( int dureeEnSecondes ) { this.dureeEnSecondes = dureeEnSecondes; } @Override public void run() { table.createTrackAndSleep ( dureeEnSecondes ); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { TransactionManager tm = (TransactionManager) applicationContext.getBean ( "atomikosTransactionManager" ); AtomikosTransactionManagerLookup.setAtomikosTransactionManager ( tm ); } }
It can be used for a shared state that is either JDBC or JBPC.
When the shared state is JBPC one launches the JBPC-specific daughter class:
public class ComportementTransactionnelTableCache extends ComportementTransactionnelTable { private static PojoCache cache; @Before public void start () { cache = PojoCacheFactory.createCache ( "resources/META-INF/replSync-service.xml" , false ); cache.getCache().getInvocationContext().getOptionOverrides().setForceWriteLock(true); cache.getCache().getInvocationContext().getOptionOverrides().setForceSynchronous(true); //cache.getCache().getConfiguration().setCacheMode(CacheMode.LOCAL); //cache.getCache().getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE); //cache.getCache().getConfiguration().setConcurrencyLevel(0); cache.getCache().getConfiguration().setLockParentForChildInsertRemove ( true ); //cache.getCache().getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC); cache.getCache().getConfiguration().setSyncCommitPhase(true); cache.getCache().getConfiguration().setSyncRollbackPhase(true); cache.getCache().getConfiguration().setWriteSkewCheck(true); cache.start(); table.setCache ( cache ); super.start(); } @After public void stop () { cache.stop(); table.unsetCache ( cache ); }
The JBPC-specific test is launched with options:
-Dlog4j.configuration=file:///D:/ff/log4j.properties -Dcom.atomikos.icatch.file=D:/ff/jta.properties -Djboss.aop.verbose=false -Djboss.aop.path=src/resources/META-INF/pojocache-aop.xml -javaagent:D:/telechargements/depuisChezMoi/jbosscacheALL/jboss-aop.jar
The transactional Spring bean(TracksTableImpl) delegates shared state either to a JDBC implementation (TracksTableJdbcDelegate) or to a JBPC implementation (TracksTableJbpcDelegate):
@Service("tracksTable") @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000) public class TracksTableImpl implements TracksTable { private TracksTableDelegate delegate = new TracksTableJbpcDelegate (); /*@Resource ( name="tracksTableJdbcDelegate" ) private TracksTableDelegate delegate;*/ @Override public void createTrack ( boolean fail ) { if ( fail ) throw new TrackException (); Track track = new TrackImpl (); delegate.createTrack ( track ); } @Override public void create2Tracks ( boolean failSurLeDeuxieme ) { createTrack ( false ); createTrack ( failSurLeDeuxieme ); } @Override public void createTrackAndSleep(int dureeEnSecondes) { createTrack ( false ); ObjectUtils.sleep ( dureeEnSecondes * 1000 ); } @Override public void deleteAllTracks() { delegate.deleteAllTracks(); } @Override public int size() { return delegate.size(); } @Override public void setCache ( PojoCache cache ) { delegate.attach ( cache ); } @Override public void unsetCache ( PojoCache cache ) { delegate.detach ( cache ); } }
@Service ("tracksTableJdbcDelegate") public class TracksTableJdbcDelegate implements TracksTableDelegate { @Resource ( name="jdbcTemplate" ) private SimpleJdbcOperations template; private static final String INSERT = "insert into TRACK (NOM) values (:NOM)"; private static final String DELETE_ALL = "delete from TRACK"; private static final String SELECT_ALL = "select count(*) from TRACK"; @Override public void createTrack ( Track track ) { System.out.println ( "createTrack/isActualTransactionActive: " + Spring.isActualTransactionActive() ); Map<String,String> map = new HashMap<String,String> (); map.put ( "NOM" , "aaa" ); template.update ( INSERT , map ); } @Override public void deleteAllTracks() { template.update ( DELETE_ALL ); } @Override public int size() { return template.queryForInt ( SELECT_ALL ); } @Override public void attach(PojoCache cache) { //DO NOTHING } @Override public void detach(PojoCache cache) { //DO NOTHING } }
@org.jboss.cache.pojo.annotation.Replicable public class TracksTableJbpcDelegate implements TracksTableDelegate { private List<Track> tracks = new ArrayList<Track> (); @Override public void createTrack ( Track track ) { System.out.println ( "createTrack/isActualTransactionActive: " + Spring.isActualTransactionActive() ); tracks.add ( track ); } @Override public void deleteAllTracks() { tracks.clear(); } @Override public int size() { System.out.println ( "size/isActualTransactionActive: " + Spring.isActualTransactionActive() ); return tracks.size(); } @Override public void attach ( PojoCache cache ) { System.out.println ( "__________attach" ); cache.attach ( "naja/tracksTable", this ); } @Override public void detach ( PojoCache cache ) { System.out.println ( "__________detach" ); cache.detach ( "naja/tracksTable" ); } }