Error handling and redelivery in conjunction with XA transactions
0xaf Dec 7, 2014 8:12 PMHi All!
I'm trying to implement a simple and classic integration pattern which uses XA-transactions. JBoss Fuse ESB version is 6.1.0.GA.
And I need an advice how to implement error handling and redelivery in conjunction with XA transactions according to current best practices.
Example XA-transaction looks like this:
- Receive message from ActiveMQ queue.
- Transform message.
- Update first database (Oracle).
- Transform message.
- Update second database (DB2).
- Send message to another ActiveMQ queue.
Need to handle errors and repeat failed transaction after delay.
Looks like there are two options to implement redelivery:
1. ActiveMQ broker redelivery. Can be configured in the ConnectionFactory with RedeliveryPolicy. Redelivery provided by broker, when message consuming transaction rolled back.
<bean id="amqXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="vm:amq"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
<property name="prefetchPolicy">
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="all" value="1"/>
</bean>
</property>
<property name="redeliveryPolicy">
<bean class="org.apache.activemq.RedeliveryPolicy">
<property name="maximumRedeliveries" value="3"/>
<property name="initialRedeliveryDelay" value="1000"/>
<property name="redeliveryDelay" value="1000"/>
</bean>
</property>
</bean>
2. Camel ErrorHandler redelivery. Can be configured for any camel route, for example using Java DSL. Redelivery implemented inside Camel pipeline.
errorHandler(
transactionErrorHandler()
.maximumRedeliveries(3)
.redeliveryDelay(1000L)
);
But there are some architectural issues in Fuse ESB which makes both of the available redelivery methods unusable in conjunction with XA transactions:
- Because broker redelivery has affinity to MessageConsumer, ActiveMQComponent.cacheLevelName should be set to CACHE_CONSUMER for normal operation, but it is not compatible with XA transactions which requires cache level CACHE_CONNECTION or CACHE_NONE. So RedeliveryPolicy is not provided as it should: redeliveryDelay, exponentialBackOff are not working.
- Camel provides redelivery only from the point of failure in the Camel route pipeline. Hence active XA transaction is not rolled back on each retry loop and remains the same. Not every Camel route step can be idempotent, so this can lead to duplicate operations in XA transaction scope. For example, if failure occurs in step (3) of the route somwhere in the code after already completed SQL insert, on next redelivery this insert will be executed one more time and there will be duplicates in DB.
I spent a few days in the research and decided that only way to achieve desired classic behavior is to disable any redeliveries and implement custom Camel route for error handling. This route will read messages from ActiveMQ.DLQ queue where PERSISTENT messages are moved after XA transaction is rolled back. And then resubmit they with desired AMQ_SCHEDULED_DELAY header. But now I'm stuck on how to determine OriginalDestination from message in ActiveMQ.DLQ. When message is read with Camel route, OriginalDestination header somehow is empty, test it with:
from("activemq-xa:queue:ActiveMQ.DLQ").routeId("catch-errors")
.log("##### OriginalDestination = '${header.OriginalDestination}'")
I'll be grateful for advices!