2 Replies Latest reply on Nov 3, 2009 8:13 AM by davaleri

    Global Error Handler and Route with an Aggregator

    davaleri

      I have been working with a route that includes an aggregator followed by several processors followed by a send to a JMS queue.  An error handler and several onException clauses are defined for the route builder.  I have created a test case to demonstrate the behavior I am observing.  I am using Camel 1.6.1.2-fuse.

       

      If I structure the route as follows:

       

      from("direct:inputBroken")

      .aggregator(constant(true))

      .batchSize(1)

      .batchTimeout(100)

      .bean(new Bean("X"))

      .multicast()

      .to("seda:log", "bean:exceptionThrower");

       

      an exception thrown in bean:exceptionThrower does not trigger the error handler on the exception handling clauses.

       

      If I structure the route as follows:

       

      from("direct:inputWorking")

                  .aggregator(constant(true))

                      .batchSize(1)

                      .batchTimeout(100).to("direct:a");

               

              from("direct:a")

                  .bean(new Bean("X"))

                  .multicast()

                      .to("seda:log", "bean:exceptionThrower");

       

      an exception thrown in bean:exceptionThrower will trigger the exception handling clauses if present or the global error handler if no exception clauses are defined.  In the first case, the Pipeline handling execution after the aggregator notices the exception in the exchange and ends processing, but the processing stops without any error handlers having a chance to do their magic.  In the second case, the error handler gets injected into the pipeline and processes the exception.

       

      The call stack in the first case is:

      ExceptionThrowingProcessor.process(Exchange) line: 10   

      BeanProcessor.process(Exchange) line: 80   

      BeanEndpoint(ProcessorEndpoint).onExchange(Exchange) line: 92   

      ProcessorEndpoint$1.process(Exchange) line: 66   

      SendProcessor.process(Exchange) line: 69   

      MulticastProcessor.process(Exchange) line: 190   

      StreamCachingInterceptor.proceed(Exchange, AsyncCallback) line: 90   

      StreamCachingInterceptor.process(Exchange, AsyncCallback) line: 82   

      Pipeline.process(Exchange, Exchange, AsyncCallback, Iterator<Processor>, AsyncProcessor) line: 115   

      Pipeline.process(Exchange, AsyncCallback) line: 89   

      AsyncProcessorHelper.process(AsyncProcessor, Exchange) line: 41   

      Pipeline.process(Exchange) line: 57   

      Aggregator(BatchProcessor).processExchange(Exchange) line: 151   

      BatchProcessor$BatchSender.sendExchanges() line: 288   

      BatchProcessor$BatchSender.run() line: 235

       

      The call stack in the second case is:

      ExceptionThrowingProcessor.process(Exchange) line: 10   

      BeanProcessor.process(Exchange) line: 80   

      BeanEndpoint(ProcessorEndpoint).onExchange(Exchange) line: 92   

      ProcessorEndpoint$1.process(Exchange) line: 66   

      SendProcessor.process(Exchange) line: 69   

      MulticastProcessor.process(Exchange) line: 190   

      StreamCachingInterceptor.proceed(Exchange, AsyncCallback) line: 90   

      StreamCachingInterceptor.process(Exchange, AsyncCallback) line: 82   

      InstrumentationProcessor.process(Exchange, AsyncCallback) line: 68   

      AsyncProcessorHelper.process(AsyncProcessor, Exchange) line: 41   

      InstrumentationProcessor.process(Exchange) line: 55   

      LoggingErrorHandler.process(Exchange) line: 54   

      AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(Exchange, AsyncCallback) line: 43   

      Pipeline.process(Exchange, Exchange, AsyncCallback, Iterator<Processor>, AsyncProcessor) line: 115   

      Pipeline.process(Exchange, AsyncCallback) line: 89   

      InstrumentationProcessor.process(Exchange, AsyncCallback) line: 68   

      UnitOfWorkProcessor.process(Exchange, AsyncCallback) line: 52   

      AsyncProcessorHelper.process(AsyncProcessor, Exchange) line: 41   

      UnitOfWorkProcessor(DelegateAsyncProcessor).process(Exchange) line: 66   

      DirectProducer<E>.process(Exchange) line: 47   

      SendProcessor.process(Exchange) line: 69   

      Aggregator(BatchProcessor).processExchange(Exchange) line: 151   

      BatchProcessor$BatchSender.sendExchanges() line: 288   

      BatchProcessor$BatchSender.run() line: 235

       

      as you can see, the call stack in the case when I split the route definition is far more complex and includes the error handler / exception clause handling.

       

      I have attached a Zip of a Maven project that includes the route and test cases.

       

      So am I using the aggregator incorrectly or is this a quirk of Camel?

        • 1. Re: Global Error Handler and Route with an Aggregator
          davsclaus

          You have to handle exceptions being thrown in your AggregationStrategy which you have to define on your aggregator.

           

          See the Camel aggregator documentation

          http://camel.apache.org/aggregator

           

          And this blog entry

          http://tmielke.blogspot.com/2009/01/using-camel-aggregator-correctly.html

           

          for a bit how to use the aggregation strategy.

           

          You can see if there was an exception using

           

          Exception cause = newExchange.getException();
          if (cause != null) {
            // an exception occurred
          }
          

           

          • 2. Re: Global Error Handler and Route with an Aggregator
            davaleri

            Thank you for the reply.

             

            I am using a custom AggregationCollection to perform my aggregation as the limitations of the BatchProcessor and the existing DefaultAggregationCollection and PredicateaggregationCollection don't support my business need.  However, the issue is not in detecting exceptions that occurred before the aggregator while performing the aggregation.  The problem I am having is in the later half of the route after the BatchSender fires and sends my aggregated exchanges.  If one of these aggregated exchanges triggers an exception in the latter half of the route, the error handler doesn't always get invoked per the description in my original post.

             

            As far as I can tell, once the BatchSender has sent my aggregated exchange, the AggregationStrategy no longer has anything to do with the processing since the BatchSender is fired from a different thread after any aggregation is completed.  The call stacks I provided in my original post are for the portion of the route after the aggregator.  I'm not seeing how the AggregationStrategy chosen for my aggregator would come into play here.