5 Replies Latest reply on Feb 1, 2010 8:02 PM by njiang

    multicast and aggregate

    mfatafta

      I am using camel 2.1.0 with servicemix: apache-servicemix-4.1.0-psc-01-00RC1

       

      Does the following look ok? If so, why nothing is being populated in the aggregator exchange:

       

      import org.apache.camel.Exchange;

      import org.apache.camel.Processor;

      import org.apache.camel.builder.RouteBuilder;

      import org.apache.camel.processor.aggregate.AggregationCollection;

      import org.apache.camel.processor.aggregate.AggregationStrategy;

      import org.apache.camel.processor.aggregate.PredicateAggregationCollection;

       

      import org.slf4j.Logger;

      import org.slf4j.LoggerFactory;

       

      import org.springframework.beans.factory.BeanInitializationException;

      import org.springframework.beans.factory.DisposableBean;

      import org.springframework.beans.factory.InitializingBean;

       

      class Proc1 implements Processor {

       

           @Override

           public void process(Exchange exchange) throws Exception {

                System.out.println( "Proc1: " + exchange.getProperty("id") );

       

                exchange.setProperty("proc1", "proc1 value");

                exchange.getOut().setBody( "proc1 message", String.class);

                exchange.getOut().setHeader("id", exchange.getProperty("id") );

                this.wait(4000);

           }

      }

       

      class Proc2 implements Processor {

       

           @Override

           public void process(Exchange exchange) throws Exception {

                System.out.println( "Proc2: " + exchange.getProperty("id") );

       

                exchange.setProperty("proc2", "proc2 value");

                exchange.getOut().setBody( "proc2 message", String.class);

                exchange.getOut().setHeader("id", exchange.getProperty("id") );

                this.wait(3000);

           }

      }

       

      public class MyRoute extends RouteBuilder {

       

           private static class MyAggregationStrategy implements AggregationStrategy {

       

               public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

       

                    System.out.println( "\n" );

                    System.out.println( "MyAggregationStrategy" );

                    System.out.println( "\n" );

       

                    if (oldExchange == null) {

                         System.out.println( "newExchange: " + newExchange.getIn().getBody(String.class) );

                       return newExchange;

                   }

       

                    System.out.println( "oldExchange: " + oldExchange.getIn().getBody(String.class) );

                    System.out.println( "newExchange: " + newExchange.getIn().getBody(String.class) );

                    System.out.println( "\n" );

       

                    String oldPrice = oldExchange.getIn().getBody(String.class);

                   String newPrice = newExchange.getIn().getBody(String.class);

       

                   //newExchange.getOut().setBody( oldPrice + " " + newPrice );

       

                   return newExchange;

               }

           }

       

           @Override

           public void configure() throws Exception {

       

                System.out.println( "Test camel route started..." );

       

                from ( "file://c:/source?noop=true&delete=true&delay=9000" )

       

                     .process(new Processor() {

                          public void process(Exchange exchange) throws Exception {

                               System.out.println( "exchange ID: " + exchange.getExchangeId() );

       

                               exchange.setProperty("id", exchange.getExchangeId());

                               exchange.getOut().setBody( exchange.getIn().getBody() );

                               exchange.getOut().setHeader("id", exchange.getExchangeId());

                          }

                     })

       

                     .multicast().parallelProcessing()

                     .to( "bean:proc1", "bean:proc2" )

                     .to( "direct:mydirect" )

                ;

       

                AggregationCollection ag = new PredicateAggregationCollection(header("id"),

                         new MyAggregationStrategy(),

                         property(Exchange.AGGREGATED_SIZE).isEqualTo(2));

       

                from ( "direct:mydirect" )

                     .aggregate()

                     .header("id")

                     .batchSize(2)

                     .groupExchanges()

                     .process(new Processor() {

                          public void process(Exchange exchange) throws Exception {

                               System.out.println( "\n\n" );

                               System.out.println( "Aggregator processor..." );

                               System.out.println( "Proc1: " + exchange.getProperty("proc1") );

                               System.out.println( "Proc2: " + exchange.getProperty("proc2") );

                               System.out.println( "Body: " + exchange.getIn().getBody(String.class) );

                          }

                     })

                ;

           }

       

      }

        • 1. Re: multicast and aggregate
          mfatafta

           

          The following code seems to go into a loop and never moves the input file and never seems to get to direct:b endpoint. Any idea on what's happening?

           

          Again, this is using Camel: 2.1.0-psc-01-00RC1 and apache-servicemix-4.1.0-psc-01-00RC1

           

          from ( )

          .multicast( new MyAggregationStrategy(), true )

          .to( "bean:proc1", "bean:proc2" )

          .end()

           

          .to( "direct:b" )

          ;

          • 2. Re: multicast and aggregate
            davsclaus

            You may have an exception being thrown somewhere which cause the message to not continue being routed. Check logs and your custom aggregation strategy which may throw an unexpected exception.

             

            Use tracer for logging etc.

            http://camel.apache.org/tracer

            • 3. Re: multicast and aggregate
              mfatafta

              I see the following exception being thrown again and again...

               

              12:01:23,989 | INFO  | ExtenderThread-6 | DefaultListableBeanFactory       | pport.DefaultListableBeanFactory  414 | Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1246bec: defining beans route1,proc1,proc2,template,consumerTemplate,camelContext:beanPostProcessor,camelContext; root of factory hierarchy

              12:01:23,989 | INFO  | ExtenderThread-6 | DefaultCamelContext              | e.camel.impl.DefaultCamelContext 1005 | Apache Camel 2.1.0-psc-01-00RC1 (CamelContext:camelContext) is starting

              12:01:23,989 | INFO  | ExtenderThread-6 | DefaultCamelContext              | e.camel.impl.DefaultCamelContext 1431 | JMX enabled. Using DefaultManagedLifecycleStrategy.

              12:01:25,910 | INFO  | ExtenderThread-6 | DefaultCamelContext              | e.camel.impl.DefaultCamelContext  997 | Apache Camel 2.1.0-psc-01-00RC1 (CamelContext:camelContext) started

              12:01:25,926 | INFO  | ExtenderThread-6 | OsgiBundleXmlApplicationContext  | ractOsgiBundleApplicationContext  327 | Publishing application context as OSGi service with properties {org.springframework.context.service.name=multicastAggregateTest, Bundle-SymbolicName=multicastAggregateTest, Bundle-Version=1.0.0}

              12:01:25,926 | INFO  | ExtenderThread-6 | ContextLoaderListener            | BundleApplicationContextListener   45 | Application context successfully refreshed (OsgiBundleXmlApplicationContext(bundle=multicastAggregateTest, config=osgibundle:/META-INF/spring/*.xml))

              12:01:52,034 | INFO  | 1: FileComponent | Tracer                           | rg.apache.camel.processor.Logger   88 | bba5f998-aae1-48c2-8224-698a5dde1e28 >>> (route1) from(file://aggregateTestSource?delay=5000) --> multicast <<< Pattern:InOnly, Headers:{CamelFileParent=aggregateTestSource, CamelFileAbsolute=false, CamelFileName=test.txt, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFilePath=aggregateTestSource\test.txt, CamelFileNameOnly=test.txt, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileLength=21, CamelFileRelativePath=test.txt}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:52,049 | INFO  | ead 3: Multicast | Tracer                           | rg.apache.camel.processor.Logger   88 | 481dbe74-ec40-4956-b351-9fd93c91891e >>> (route1) bean://proc1 --> bean://proc2 <<< Pattern:InOnly, Headers:{CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFileNameOnly=test.txt, CamelFileAbsolute=false, CamelFileParent=aggregateTestSource, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileName=test.txt, CamelFileRelativePath=test.txt, CamelFilePath=aggregateTestSource\test.txt, CamelFileLength=21}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:52,049 | INFO  | ead 2: Multicast | Tracer                           | rg.apache.camel.processor.Logger   88 | c261530a-d450-4d87-868f-f0904533bc90 >>> (route1) bean://proc1 --> bean://proc2 <<< Pattern:InOnly, Headers:{CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileParent=aggregateTestSource, CamelFileAbsolute=false, CamelFileName=test.txt, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFileLength=21, CamelFilePath=aggregateTestSource\test.txt, CamelFileRelativePath=test.txt, CamelFileNameOnly=test.txt}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:52,128 | ERROR | 1: FileComponent | GenericFileOnCompletion          | rg.apache.camel.processor.Logger  248 |

              java.lang.IllegalMonitorStateException

                   at java.lang.Object.wait(Native Method)

                   at com.acm.route.Proc2.process(Route1.java:66)

                   at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:83)

                   at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:95)

                   at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:65)

                   at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:97)

                   at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:95)

                   at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:146)

                   at org.apache.camel.processor.SendProcessor.doProcess(SendProcessor.java:94)

                   at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:82)

                   at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

                   at org.apache.camel.processor.DelegateProcessor.processNext(DelegateProcessor.java:53)

                   at org.apache.camel.processor.DelegateProcessor.proceed(DelegateProcessor.java:82)

                   at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:162)

                   at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

                   at org.apache.camel.processor.RedeliveryErrorHandler.processExchange(RedeliveryErrorHandler.java:223)

                   at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:153)

                   at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:91)

                   at org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:49)

                   at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:206)

                   at org.apache.camel.processor.MulticastProcessor.doProcess(MulticastProcessor.java:241)

                   at org.apache.camel.processor.MulticastProcessor.access$000(MulticastProcessor.java:57)

                   at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:166)

                   at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:160)

                   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)

                   at java.util.concurrent.FutureTask.run(FutureTask.java:138)

                   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)

                   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)

                   at java.util.concurrent.FutureTask.run(FutureTask.java:138)

                   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)

                   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:207)

                   at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

                   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

                   at java.lang.Thread.run(Thread.java:619)

              12:01:52,143 | WARN  | 1: FileComponent | GenericFileOnCompletion          | ent.file.GenericFileOnCompletion  142 | Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@18bdf2a for file: GenericFile[test.txt]

              12:01:56,909 | INFO  | 1: FileComponent | Tracer                           | rg.apache.camel.processor.Logger   88 | 0455bed3-78d4-4765-81d0-33e65a07ab87 >>> (route1) from(file://aggregateTestSource?delay=5000) --> multicast <<< Pattern:InOnly, Headers:{CamelFilePath=aggregateTestSource\test.txt, CamelFileParent=aggregateTestSource, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileRelativePath=test.txt, CamelFileName=test.txt, CamelFileNameOnly=test.txt, CamelFileLength=21}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:56,909 | INFO  | ead 4: Multicast | Tracer                           | rg.apache.camel.processor.Logger   88 | e5c2b8d5-c279-449b-99b8-8f0f3a9f3ce8 >>> (route1) multicast --> bean://proc1 <<< Pattern:InOnly, Headers:{CamelFilePath=aggregateTestSource\test.txt, CamelFileParent=aggregateTestSource, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFileRelativePath=test.txt, CamelFileNameOnly=test.txt, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileLength=21, CamelFileName=test.txt, CamelFileAbsolute=false}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:56,924 | INFO  | ead 5: Multicast | Tracer                           | rg.apache.camel.processor.Logger   88 | 17782fe4-f781-4e7b-a8c7-e7e1221f1439 >>> (route1) bean://proc1 --> bean://proc2 <<< Pattern:InOnly, Headers:{CamelFileRelativePath=test.txt, CamelFileNameOnly=test.txt, CamelFilePath=aggregateTestSource\test.txt, CamelFileLength=21, CamelFileParent=aggregateTestSource, CamelFileName=test.txt, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:56,924 | ERROR | 1: FileComponent | GenericFileOnCompletion          | rg.apache.camel.processor.Logger  248 |

              java.lang.IllegalMonitorStateException

                   at java.lang.Object.wait(Native Method)

                   at com.acm.route.Proc2.process(Route1.java:66)

                   at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:83)

                   at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:95)

                   at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:65)

                   at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:97)

                   at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:95)

                   at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:146)

                   at org.apache.camel.processor.SendProcessor.doProcess(SendProcessor.java:94)

                   at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:82)

                   at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

                   at org.apache.camel.processor.DelegateProcessor.processNext(DelegateProcessor.java:53)

                   at org.apache.camel.processor.DelegateProcessor.proceed(DelegateProcessor.java:82)

                   at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:162)

                   at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

                   at org.apache.camel.processor.RedeliveryErrorHandler.processExchange(RedeliveryErrorHandler.java:223)

                   at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:153)

                   at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:91)

                   at org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:49)

                   at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:206)

                   at org.apache.camel.processor.MulticastProcessor.doProcess(MulticastProcessor.java:241)

                   at org.apache.camel.processor.MulticastProcessor.access$000(MulticastProcessor.java:57)

                   at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:166)

                   at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:160)

                   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)

                   at java.util.concurrent.FutureTask.run(FutureTask.java:138)

                   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)

                   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)

                   at java.util.concurrent.FutureTask.run(FutureTask.java:138)

                   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)

                   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:207)

                   at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

                   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

                   at java.lang.Thread.run(Thread.java:619)

              12:01:56,940 | WARN  | 1: FileComponent | GenericFileOnCompletion          | ent.file.GenericFileOnCompletion  142 | Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@18bdf2a for file: GenericFile[test.txt]

              • 4. Re: multicast and aggregate
                stlewis

                It looks like the exception is coming from your bean's process method, what's going on at this line:

                 

                at com.acm.route.Proc2.process(Route1.java:66)

                • 5. Re: multicast and aggregate
                  njiang

                  Can you use Thread.sleep() instead this.wait() in your processor?

                  BTW,  you can also try to turn off the multicast's parallelProcessing.