Transactions: atomicity OK, isolation KO
chtimi2 Sep 10, 2009 5:29 AMI'm studying the feasibility of using JbossPOJOCache (henceforth JBPC) as a transactional cache for our application framework. I'm using a AtomikosEssentials transaction manager, and Spring as a declarative JTA container. JBPC is 3.0.0GA.
To do so i wrote 2 unit tests, one for testing the A part of ACID and one for testing the I part of ACID.
-The atomicity test executes a failing transaction and verifies it has been rollbacked.
-The isolation test starts a thread that modifies a JBPC object; this update is artifically long (Thread.sleep at the end). It then verifies that a read is blocked until the update is over, and that the read returns the updated state
The atomicity test passes, indicating that indeed i'm working inside a transaction.
But the isolation test fails. The test per se seems correct because if i use a JDBC resource instead of the JBPC object as shared state, the test passes. Thus, there seems to be a problem with my JBPC configuration, or something else i don't understand.
The unit test:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration ( locations={"/application-context_Jdbc_Atomikos.xml"} )
public class ComportementTransactionnelTable implements ApplicationContextAware
{
@Resource ( name="tracksTable" )
protected TracksTable table;
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
protected static ApplicationContext applicationContext;
@Before
public void start ()
{
table.deleteAllTracks ();
assertEquals ( 0 , table.size() );
}
@Test
//@Ignore
public void atomicite () throws Exception
{
//Apres saisie: 1 ligne
table.create2Tracks ( false );
assertEquals ( 2 , table.size() );
try
{
table.create2Tracks ( true );
fail ();
}
catch ( Exception e ) {}
finally
{
assertEquals ( 2 , table.size() );
}
}
@Test
//@Ignore
public void isolation () throws Exception
{
assertEquals ( 0 , table.size() );
int dureeEnSecondes = 5; //Must be < lock acquire timeout
executorService.submit ( new MiseAJourLongue ( dureeEnSecondes ) );
StopWatch stopWatch = new StopWatch ();
stopWatch.start();
//Wait a little to be sure that by the time we get to table.size,
//the MiseAJourLongue thread has had time to start its transaction
ObjectUtils.sleep ( 100 );
assertEquals ( 1 , table.size() );
stopWatch.stop();
double tempsBloque = stopWatch.getTotalTimeSeconds();
System.out.println ( "isolation/tempsBloque: " + tempsBloque );
assertTrue ( tempsBloque>= dureeEnSecondes );
}
private class MiseAJourLongue implements Runnable
{
private final int dureeEnSecondes;
MiseAJourLongue ( int dureeEnSecondes )
{
this.dureeEnSecondes = dureeEnSecondes;
}
@Override
public void run()
{
table.createTrackAndSleep ( dureeEnSecondes );
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
{
TransactionManager tm = (TransactionManager) applicationContext.getBean ( "atomikosTransactionManager" );
AtomikosTransactionManagerLookup.setAtomikosTransactionManager ( tm );
}
}
It can be used for a shared state that is either JDBC or JBPC.
When the shared state is JBPC one launches the JBPC-specific daughter class:
public class ComportementTransactionnelTableCache extends ComportementTransactionnelTable
{
private static PojoCache cache;
@Before
public void start ()
{
cache = PojoCacheFactory.createCache ( "resources/META-INF/replSync-service.xml" , false );
cache.getCache().getInvocationContext().getOptionOverrides().setForceWriteLock(true);
cache.getCache().getInvocationContext().getOptionOverrides().setForceSynchronous(true);
//cache.getCache().getConfiguration().setCacheMode(CacheMode.LOCAL);
//cache.getCache().getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
//cache.getCache().getConfiguration().setConcurrencyLevel(0);
cache.getCache().getConfiguration().setLockParentForChildInsertRemove ( true );
//cache.getCache().getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC);
cache.getCache().getConfiguration().setSyncCommitPhase(true);
cache.getCache().getConfiguration().setSyncRollbackPhase(true);
cache.getCache().getConfiguration().setWriteSkewCheck(true);
cache.start();
table.setCache ( cache );
super.start();
}
@After
public void stop ()
{
cache.stop();
table.unsetCache ( cache );
}The JBPC-specific test is launched with options:
-Dlog4j.configuration=file:///D:/ff/log4j.properties -Dcom.atomikos.icatch.file=D:/ff/jta.properties -Djboss.aop.verbose=false -Djboss.aop.path=src/resources/META-INF/pojocache-aop.xml -javaagent:D:/telechargements/depuisChezMoi/jbosscacheALL/jboss-aop.jar
The transactional Spring bean(TracksTableImpl) delegates shared state either to a JDBC implementation (TracksTableJdbcDelegate) or to a JBPC implementation (TracksTableJbpcDelegate):
@Service("tracksTable")
@Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000)
public class TracksTableImpl implements TracksTable
{
private TracksTableDelegate delegate = new TracksTableJbpcDelegate ();
/*@Resource ( name="tracksTableJdbcDelegate" )
private TracksTableDelegate delegate;*/
@Override
public void createTrack ( boolean fail )
{
if ( fail ) throw new TrackException ();
Track track = new TrackImpl ();
delegate.createTrack ( track );
}
@Override
public void create2Tracks ( boolean failSurLeDeuxieme )
{
createTrack ( false );
createTrack ( failSurLeDeuxieme );
}
@Override
public void createTrackAndSleep(int dureeEnSecondes)
{
createTrack ( false );
ObjectUtils.sleep ( dureeEnSecondes * 1000 );
}
@Override
public void deleteAllTracks()
{
delegate.deleteAllTracks();
}
@Override
public int size()
{
return delegate.size();
}
@Override
public void setCache ( PojoCache cache )
{
delegate.attach ( cache );
}
@Override
public void unsetCache ( PojoCache cache )
{
delegate.detach ( cache );
}
}
@Service ("tracksTableJdbcDelegate")
public class TracksTableJdbcDelegate implements TracksTableDelegate
{
@Resource ( name="jdbcTemplate" )
private SimpleJdbcOperations template;
private static final String INSERT = "insert into TRACK (NOM) values (:NOM)";
private static final String DELETE_ALL = "delete from TRACK";
private static final String SELECT_ALL = "select count(*) from TRACK";
@Override
public void createTrack ( Track track )
{
System.out.println ( "createTrack/isActualTransactionActive: " + Spring.isActualTransactionActive() );
Map<String,String> map = new HashMap<String,String> ();
map.put ( "NOM" , "aaa" );
template.update ( INSERT , map );
}
@Override
public void deleteAllTracks()
{
template.update ( DELETE_ALL );
}
@Override
public int size()
{
return template.queryForInt ( SELECT_ALL );
}
@Override
public void attach(PojoCache cache)
{
//DO NOTHING
}
@Override
public void detach(PojoCache cache)
{
//DO NOTHING
}
}
@org.jboss.cache.pojo.annotation.Replicable
public class TracksTableJbpcDelegate implements TracksTableDelegate
{
private List<Track> tracks = new ArrayList<Track> ();
@Override
public void createTrack ( Track track )
{
System.out.println ( "createTrack/isActualTransactionActive: " + Spring.isActualTransactionActive() );
tracks.add ( track );
}
@Override
public void deleteAllTracks()
{
tracks.clear();
}
@Override
public int size()
{
System.out.println ( "size/isActualTransactionActive: " + Spring.isActualTransactionActive() );
return tracks.size();
}
@Override
public void attach ( PojoCache cache )
{
System.out.println ( "__________attach" );
cache.attach ( "naja/tracksTable", this );
}
@Override
public void detach ( PojoCache cache )
{
System.out.println ( "__________detach" );
cache.detach ( "naja/tracksTable" );
}
}