-
15. Re: Asynchronicity and transaction context propagation
marklittle Sep 21, 2009 6:59 AM (in response to chtimi2)For more information about CheckedAction look in the ArjunaCore Programmer's Guide. If that doesn't answer your questions then let us know.
-
16. Re: Asynchronicity and transaction context propagation
chtimi2 Sep 23, 2009 3:48 AM (in response to chtimi2)Are you referring to the TrCoreProgrammersGuide pdf? I think so (was probably renamed) but just to be sure.
I did find something of interest in it:"TrCoreProgrammersGuide.pdf page33 (Checking transactions)" wrote:
When a thread attempts to terminate the transaction and there are active threads within it, the system will invoke the check method on the transaction’s CheckedAction object.
The parameters to the check method are:
- isCommit: indicates whether the transaction is in the process of committing or rolling back.
- actUid: the transaction identifier.
- list: a list of all of the threads currently marked as active within this transaction.
When check returns, the transaction termination will continue. Obviously the state of the transaction at this point may be different from that when check was called, e.g., the transaction may subsequently have been committed.
As i thought, the check method is passed a list of the active threads.
If i'm not mistaken, i would just need to implement a BlockingCheckedAction, that calls join() on all those threads, and use it for the global transaction.
To sum it up:
1/ Launch the server with -Dcom.arjuna.ats.jts.checkedTransactions=YES to enable checked transactions
2/ Implement BlockingCheckedAction that does:for (Thread t: list) { t.join() }
3/ Just after the global transaction has started i would need to doCurrent current = OTSManager.get_current(). current.setCheckedAction ( new BlockingCheckedAction() );
I thought of a simple unit test to verify the correctness of the transactional behaviour.
Do you think this would work? I prefer asking before i start spending time integrating another TransactionManager in Spring.
1/ UnitTest (UT) calls transactional service void updateXY ( int X , int Y , boolean failOnUpdatingY ) synchronously
2/ The global transaction is initiated and updateXY updates X
3/ updateXY then calls transactional service updateY ( int Y , boolean failOnUpdatingY ) asynchronously
4/ updateY executes within the same global transaction; if (failOnUpdatingY) a RuntimeException is thrown, which must rollback the global transaction
5/ UT sleeps for a second, by then the transaction should have been committed or rolled back
6/ UT calls getX and getY
7/ Verifications:
Case ! failOnUpdatingY: X and Y should have their new values because the global transaction has been committed
Case failOnUpdatingY: X and Y should have their old values because the global transaction has been rolled back -
17. Re: Asynchronicity and transaction context propagation
marklittle Sep 23, 2009 10:28 AM (in response to chtimi2)"chtimi2" wrote:
Are you referring to the TrCoreProgrammersGuide pdf? I think so (was probably renamed) but just to be sure.
Yes.
As i thought, the check method is passed a list of the active threads.
If i'm not mistaken, i would just need to implement a BlockingCheckedAction, that calls join() on all those threads, and use it for the global transaction.
That should work.
To sum it up:
1/ Launch the server with -Dcom.arjuna.ats.jts.checkedTransactions=YES to enable checked transactions
Last time I checked it was on by default.
2/ Implement BlockingCheckedAction that does:for (Thread t: list) { t.join() }
3/ Just after the global transaction has started i would need to doCurrent current = OTSManager.get_current(). current.setCheckedAction ( new BlockingCheckedAction() );
I thought of a simple unit test to verify the correctness of the transactional behaviour.
Do you think this would work? I prefer asking before i start spending time integrating another TransactionManager in Spring.
1/ UnitTest (UT) calls transactional service void updateXY ( int X , int Y , boolean failOnUpdatingY ) synchronously
2/ The global transaction is initiated and updateXY updates X
3/ updateXY then calls transactional service updateY ( int Y , boolean failOnUpdatingY ) asynchronously
4/ updateY executes within the same global transaction; if (failOnUpdatingY) a RuntimeException is thrown, which must rollback the global transaction
5/ UT sleeps for a second, by then the transaction should have been committed or rolled back
6/ UT calls getX and getY
7/ Verifications:
Case ! failOnUpdatingY: X and Y should have their new values because the global transaction has been committed
Case failOnUpdatingY: X and Y should have their old values because the global transaction has been rolled back
Assuming you're not looking at distributed invocations here then that looks like it should work (but I haven't put a lot of time into thinking about it.) -
18. Re: Asynchronicity and transaction context propagation
chtimi2 Sep 23, 2009 11:21 AM (in response to chtimi2)Allright, looks like it is worth trying then. I'll let you know how it goes.
-
19. Re: Asynchronicity and transaction context propagation
chtimi2 Sep 29, 2009 5:54 AM (in response to chtimi2)Alright. The first step is to write a unit test that asserts checked transaction semantics.
1/ atomicitySync is the standard synchronous case, and passes
2/ atomicityAsync is the asynchronous case, and fails
I use JBoss Cache (core edition) as the transactional resource.
Now the goal is to make 2/ pass in steps:
-same test, same results, but using JBossTS (verify correct JBTS integration)
-set up JBossTS as explained in the previous post to make it pass
But first do you agree with my test?
Here is the test:@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration ( locations={"/application-context_Jdbc_Atomikos.xml"} ) public class ComportementTransactionnelAsync implements ApplicationContextAware { private Cache cache; @Resource ( name="tracksTable" ) protected TracksTable table; protected static ApplicationContext applicationContext; @Test //@Ignore public void atomicitySync () throws Exception { atomicity ( false ); } @Test public void atomicityAsync () throws Exception { atomicity ( true ); } private void atomicity ( boolean async ) throws Exception { final String A_OK="A", B_OK="B", A_KO="C" , B_KO="D"; updateAB ( A_OK , B_OK , async , false ); assertEquals ( A_OK , table.getA() ); assertEquals ( B_OK , table.getB() ); try { updateAB ( A_KO , B_KO , async , true ); fail (); } catch ( Exception e ) {} finally { assertEquals ( B_OK , table.getB() ); assertEquals ( A_OK , table.getA() ); } } private void updateAB(String a, String b, boolean async, boolean failOnB) throws InterruptedException { table.updateAB ( a , b , async , failOnB ); if ( async ) { /* Commented since getA can be blocked by updateAB since IsolationLevel=Serializable * assertEquals ( "" , table.getA() ); //Transaction should not be committed yet assertEquals ( "" , table.getB() ); //Transaction should not be committed yet*/ Thread.sleep(1500); } } @Before public void before () { cache = createCoreCache (); cache.start(); table.setCache ( cache ); table.resetAB (); assertEquals ( "" , table.getA() ); assertEquals ( "" , table.getB() ); } @After public void stop () { table.unsetCache ( cache ); cache.stop(); } private Cache createCoreCache() { CacheFactory factory = new DefaultCacheFactory(); Cache cache = factory.createCache("resources/META-INF/replSync-service.xml", false); cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true); cache.create(); return cache; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { TransactionManager tm = (TransactionManager) applicationContext.getBean ( "atomikosTransactionManager" ); AtomikosTransactionManagerLookup.setAtomikosTransactionManager ( tm ); } }
And here is the service implementation:@Service("tracksTable") @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000) public class TracksTableImpl implements TracksTable { //--------------JBC----------------- private Cache coreCache; @Override public void updateAB ( String a , String b , boolean async , boolean failOnUpdateB ) { setA ( a ); Thread t = new Thread ( new SleepAndSetB ( b , failOnUpdateB ) ); if ( async ) { t.start(); } else t.run(); } private class SleepAndSetB implements Runnable { private final String b; private final boolean failOnUpdateB; SleepAndSetB ( String b, boolean failOnUpdateB ) { this.b = b; this.failOnUpdateB = failOnUpdateB; } @Override public void run() { ObjectUtils.sleep(500); if ( failOnUpdateB ) throw new TrackException (); setB ( b ); } } @Override public String getA() { Node rootNode = coreCache.getRoot(); return (String)rootNode.get ( "a" ); } @Override public String getB() { Node rootNode = coreCache.getRoot(); return (String)rootNode.get ( "b" ); } private void setA ( String a ) { System.out.println ( "setA [ a=" + a + " ]" ); Node rootNode = coreCache.getRoot(); rootNode.put ( "a" , a ); } private void setB ( String b ) { System.out.println ( "setB [ b=" + b + " ]" ); Node rootNode = coreCache.getRoot(); rootNode.put ( "b" , b ); } @Override public void resetAB() { setA ( "" ); setB ( "" ); } @Override public void setCache ( Cache cache ) { coreCache = cache; } @Override public void unsetCache ( Cache cache ) { coreCache = null; } }
JBossCache conf (replSync-service.xml):<?xml version="1.0" encoding="UTF-8"?> <server> <mbean code="org.jboss.cache.jmx.CacheJmxWrapper" name="jboss.cache:service=TreeCache"> <attribute name="TransactionManagerLookupClass">hellotrackworld.impl.srv.AtomikosTransactionManagerLookup</attribute> <attribute name="NodeLockingScheme">PESSIMISTIC</attribute> <attribute name="IsolationLevel">SERIALIZABLE</attribute> <attribute name="CacheMode">LOCAL</attribute> <attribute name="UseReplQueue">false</attribute> <attribute name="ReplQueueInterval">0</attribute> <attribute name="ReplQueueMaxElements">0</attribute> <attribute name="ClusterName">JBossCache-Cluster</attribute> <attribute name="ClusterConfig"> <config> <UDP mcast_addr="228.10.10.10" mcast_port="45588" tos="8" ucast_recv_buf_size="20000000" ucast_send_buf_size="640000" mcast_recv_buf_size="25000000" mcast_send_buf_size="640000" loopback="false" discard_incompatible_packets="true" max_bundle_size="64000" max_bundle_timeout="30" use_incoming_packet_handler="true" ip_ttl="2" enable_bundling="false" enable_diagnostics="true" use_concurrent_stack="true" thread_naming_pattern="pl" thread_pool.enabled="true" thread_pool.min_threads="1" thread_pool.max_threads="25" thread_pool.keep_alive_time="30000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="10" thread_pool.rejection_policy="Run" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="1" oob_thread_pool.max_threads="4" oob_thread_pool.keep_alive_time="10000" oob_thread_pool.queue_enabled="true" oob_thread_pool.queue_max_size="10" oob_thread_pool.rejection_policy="Run"/> <PING timeout="2000" num_initial_members="3"/> <MERGE2 max_interval="30000" min_interval="10000"/> <FD_SOCK/> <FD timeout="10000" max_tries="5" shun="true"/> <VERIFY_SUSPECT timeout="1500"/> <pbcast.NAKACK max_xmit_size="60000" use_mcast_xmit="false" gc_lag="0" retransmit_timeout="300,600,1200,2400,4800" discard_delivered_msgs="true"/> <UNICAST timeout="300,600,1200,2400,3600"/> <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="400000"/> <pbcast.GMS print_local_addr="true" join_timeout="5000" join_retry_timeout="2000" shun="false" view_bundling="true" view_ack_collection_timeout="5000"/> <FRAG2 frag_size="60000"/> <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/> <!-- <pbcast.STATE_TRANSFER/> --> <pbcast.FLUSH timeout="0"/> </config> </attribute> <attribute name="FetchInMemoryState">true</attribute> <attribute name="StateRetrievalTimeout">15000</attribute> <attribute name="SyncReplTimeout">15000</attribute> <attribute name="LockAcquisitionTimeout">10000</attribute> <attribute name="UseRegionBasedMarshalling">true</attribute> </mbean> </server>
Spring conf (application-context.xml):<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd"> <!--CONF GENERALE vvvvv--> <context:component-scan base-package="hellotrackworld"/> <context:annotation-config/> <!--CONF GENERALE ^^^^^--> <!--PARAMETRAGE JTA vvvvv--> <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close"> <property name="forceShutdown" value="true"/> </bean> <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"> <property name="transactionTimeout" value="300"/> </bean> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="transactionManager" ref="atomikosTransactionManager" /> <property name="userTransaction" ref="atomikosUserTransaction" /> <property name="allowCustomIsolationLevels" value="true" /> </bean> <tx:annotation-driven transaction-manager="jtaTransactionManager"/> <!--PARAMETRAGE JTA ^^^^^--> </beans>