Global Error Handler and Route with an Aggregator
davaleri Nov 1, 2009 8:48 AMI have been working with a route that includes an aggregator followed by several processors followed by a send to a JMS queue. An error handler and several onException clauses are defined for the route builder. I have created a test case to demonstrate the behavior I am observing. I am using Camel 1.6.1.2-fuse.
If I structure the route as follows:
from("direct:inputBroken")
.aggregator(constant(true))
.batchSize(1)
.batchTimeout(100)
.bean(new Bean("X"))
.multicast()
.to("seda:log", "bean:exceptionThrower");
an exception thrown in bean:exceptionThrower does not trigger the error handler on the exception handling clauses.
If I structure the route as follows:
from("direct:inputWorking")
.aggregator(constant(true))
.batchSize(1)
.batchTimeout(100).to("direct:a");
from("direct:a")
.bean(new Bean("X"))
.multicast()
.to("seda:log", "bean:exceptionThrower");
an exception thrown in bean:exceptionThrower will trigger the exception handling clauses if present or the global error handler if no exception clauses are defined. In the first case, the Pipeline handling execution after the aggregator notices the exception in the exchange and ends processing, but the processing stops without any error handlers having a chance to do their magic. In the second case, the error handler gets injected into the pipeline and processes the exception.
The call stack in the first case is:
ExceptionThrowingProcessor.process(Exchange) line: 10
BeanProcessor.process(Exchange) line: 80
BeanEndpoint(ProcessorEndpoint).onExchange(Exchange) line: 92
ProcessorEndpoint$1.process(Exchange) line: 66
SendProcessor.process(Exchange) line: 69
MulticastProcessor.process(Exchange) line: 190
StreamCachingInterceptor.proceed(Exchange, AsyncCallback) line: 90
StreamCachingInterceptor.process(Exchange, AsyncCallback) line: 82
Pipeline.process(Exchange, Exchange, AsyncCallback, Iterator<Processor>, AsyncProcessor) line: 115
Pipeline.process(Exchange, AsyncCallback) line: 89
AsyncProcessorHelper.process(AsyncProcessor, Exchange) line: 41
Pipeline.process(Exchange) line: 57
Aggregator(BatchProcessor).processExchange(Exchange) line: 151
BatchProcessor$BatchSender.sendExchanges() line: 288
BatchProcessor$BatchSender.run() line: 235
The call stack in the second case is:
ExceptionThrowingProcessor.process(Exchange) line: 10
BeanProcessor.process(Exchange) line: 80
BeanEndpoint(ProcessorEndpoint).onExchange(Exchange) line: 92
ProcessorEndpoint$1.process(Exchange) line: 66
SendProcessor.process(Exchange) line: 69
MulticastProcessor.process(Exchange) line: 190
StreamCachingInterceptor.proceed(Exchange, AsyncCallback) line: 90
StreamCachingInterceptor.process(Exchange, AsyncCallback) line: 82
InstrumentationProcessor.process(Exchange, AsyncCallback) line: 68
AsyncProcessorHelper.process(AsyncProcessor, Exchange) line: 41
InstrumentationProcessor.process(Exchange) line: 55
LoggingErrorHandler.process(Exchange) line: 54
AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(Exchange, AsyncCallback) line: 43
Pipeline.process(Exchange, Exchange, AsyncCallback, Iterator<Processor>, AsyncProcessor) line: 115
Pipeline.process(Exchange, AsyncCallback) line: 89
InstrumentationProcessor.process(Exchange, AsyncCallback) line: 68
UnitOfWorkProcessor.process(Exchange, AsyncCallback) line: 52
AsyncProcessorHelper.process(AsyncProcessor, Exchange) line: 41
UnitOfWorkProcessor(DelegateAsyncProcessor).process(Exchange) line: 66
DirectProducer<E>.process(Exchange) line: 47
SendProcessor.process(Exchange) line: 69
Aggregator(BatchProcessor).processExchange(Exchange) line: 151
BatchProcessor$BatchSender.sendExchanges() line: 288
BatchProcessor$BatchSender.run() line: 235
as you can see, the call stack in the case when I split the route definition is far more complex and includes the error handler / exception clause handling.
I have attached a Zip of a Maven project that includes the route and test cases.
So am I using the aggregator incorrectly or is this a quirk of Camel?
-
camel-aggregator.zip 5.4 KB