5 Replies Latest reply on Feb 1, 2010 8:02 PM by Willem Jiang

    multicast and aggregate

    Mack Fata Newbie

      I am using camel 2.1.0 with servicemix: apache-servicemix-4.1.0-psc-01-00RC1

       

      Does the following look ok? If so, why nothing is being populated in the aggregator exchange:

       

      import org.apache.camel.Exchange;

      import org.apache.camel.Processor;

      import org.apache.camel.builder.RouteBuilder;

      import org.apache.camel.processor.aggregate.AggregationCollection;

      import org.apache.camel.processor.aggregate.AggregationStrategy;

      import org.apache.camel.processor.aggregate.PredicateAggregationCollection;

       

      import org.slf4j.Logger;

      import org.slf4j.LoggerFactory;

       

      import org.springframework.beans.factory.BeanInitializationException;

      import org.springframework.beans.factory.DisposableBean;

      import org.springframework.beans.factory.InitializingBean;

       

      class Proc1 implements Processor {

       

           @Override

           public void process(Exchange exchange) throws Exception {

                System.out.println( "Proc1: " + exchange.getProperty("id") );

       

                exchange.setProperty("proc1", "proc1 value");

                exchange.getOut().setBody( "proc1 message", String.class);

                exchange.getOut().setHeader("id", exchange.getProperty("id") );

                this.wait(4000);

           }

      }

       

      class Proc2 implements Processor {

       

           @Override

           public void process(Exchange exchange) throws Exception {

                System.out.println( "Proc2: " + exchange.getProperty("id") );

       

                exchange.setProperty("proc2", "proc2 value");

                exchange.getOut().setBody( "proc2 message", String.class);

                exchange.getOut().setHeader("id", exchange.getProperty("id") );

                this.wait(3000);

           }

      }

       

      public class MyRoute extends RouteBuilder {

       

           private static class MyAggregationStrategy implements AggregationStrategy {

       

               public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

       

                    System.out.println( "\n" );

                    System.out.println( "MyAggregationStrategy" );

                    System.out.println( "\n" );

       

                    if (oldExchange == null) {

                         System.out.println( "newExchange: " + newExchange.getIn().getBody(String.class) );

                       return newExchange;

                   }

       

                    System.out.println( "oldExchange: " + oldExchange.getIn().getBody(String.class) );

                    System.out.println( "newExchange: " + newExchange.getIn().getBody(String.class) );

                    System.out.println( "\n" );

       

                    String oldPrice = oldExchange.getIn().getBody(String.class);

                   String newPrice = newExchange.getIn().getBody(String.class);

       

                   //newExchange.getOut().setBody( oldPrice + " " + newPrice );

       

                   return newExchange;

               }

           }

       

           @Override

           public void configure() throws Exception {

       

                System.out.println( "Test camel route started..." );

       

                from ( "file://c:/source?noop=true&delete=true&delay=9000" )

       

                     .process(new Processor() {

                          public void process(Exchange exchange) throws Exception {

                               System.out.println( "exchange ID: " + exchange.getExchangeId() );

       

                               exchange.setProperty("id", exchange.getExchangeId());

                               exchange.getOut().setBody( exchange.getIn().getBody() );

                               exchange.getOut().setHeader("id", exchange.getExchangeId());

                          }

                     })

       

                     .multicast().parallelProcessing()

                     .to( "bean:proc1", "bean:proc2" )

                     .to( "direct:mydirect" )

                ;

       

                AggregationCollection ag = new PredicateAggregationCollection(header("id"),

                         new MyAggregationStrategy(),

                         property(Exchange.AGGREGATED_SIZE).isEqualTo(2));

       

                from ( "direct:mydirect" )

                     .aggregate()

                     .header("id")

                     .batchSize(2)

                     .groupExchanges()

                     .process(new Processor() {

                          public void process(Exchange exchange) throws Exception {

                               System.out.println( "\n\n" );

                               System.out.println( "Aggregator processor..." );

                               System.out.println( "Proc1: " + exchange.getProperty("proc1") );

                               System.out.println( "Proc2: " + exchange.getProperty("proc2") );

                               System.out.println( "Body: " + exchange.getIn().getBody(String.class) );

                          }

                     })

                ;

           }

       

      }

        • 1. Re: multicast and aggregate
          Mack Fata Newbie

           

          The following code seems to go into a loop and never moves the input file and never seems to get to direct:b endpoint. Any idea on what's happening?

           

          Again, this is using Camel: 2.1.0-psc-01-00RC1 and apache-servicemix-4.1.0-psc-01-00RC1

           

          from ( )

          .multicast( new MyAggregationStrategy(), true )

          .to( "bean:proc1", "bean:proc2" )

          .end()

           

          .to( "direct:b" )

          ;

          • 2. Re: multicast and aggregate
            Claus Ibsen Master

            You may have an exception being thrown somewhere which cause the message to not continue being routed. Check logs and your custom aggregation strategy which may throw an unexpected exception.

             

            Use tracer for logging etc.

            http://camel.apache.org/tracer

            • 3. Re: multicast and aggregate
              Mack Fata Newbie

              I see the following exception being thrown again and again...

               

              12:01:23,989 | INFO  | ExtenderThread-6 | DefaultListableBeanFactory       | pport.DefaultListableBeanFactory  414 | Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1246bec: defining beans route1,proc1,proc2,template,consumerTemplate,camelContext:beanPostProcessor,camelContext; root of factory hierarchy

              12:01:23,989 | INFO  | ExtenderThread-6 | DefaultCamelContext              | e.camel.impl.DefaultCamelContext 1005 | Apache Camel 2.1.0-psc-01-00RC1 (CamelContext:camelContext) is starting

              12:01:23,989 | INFO  | ExtenderThread-6 | DefaultCamelContext              | e.camel.impl.DefaultCamelContext 1431 | JMX enabled. Using DefaultManagedLifecycleStrategy.

              12:01:25,910 | INFO  | ExtenderThread-6 | DefaultCamelContext              | e.camel.impl.DefaultCamelContext  997 | Apache Camel 2.1.0-psc-01-00RC1 (CamelContext:camelContext) started

              12:01:25,926 | INFO  | ExtenderThread-6 | OsgiBundleXmlApplicationContext  | ractOsgiBundleApplicationContext  327 | Publishing application context as OSGi service with properties {org.springframework.context.service.name=multicastAggregateTest, Bundle-SymbolicName=multicastAggregateTest, Bundle-Version=1.0.0}

              12:01:25,926 | INFO  | ExtenderThread-6 | ContextLoaderListener            | BundleApplicationContextListener   45 | Application context successfully refreshed (OsgiBundleXmlApplicationContext(bundle=multicastAggregateTest, config=osgibundle:/META-INF/spring/*.xml))

              12:01:52,034 | INFO  | 1: FileComponent | Tracer                           | rg.apache.camel.processor.Logger   88 | bba5f998-aae1-48c2-8224-698a5dde1e28 >>> (route1) from(file://aggregateTestSource?delay=5000) --> multicast <<< Pattern:InOnly, Headers:{CamelFileParent=aggregateTestSource, CamelFileAbsolute=false, CamelFileName=test.txt, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFilePath=aggregateTestSource\test.txt, CamelFileNameOnly=test.txt, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileLength=21, CamelFileRelativePath=test.txt}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:52,049 | INFO  | ead 3: Multicast | Tracer                           | rg.apache.camel.processor.Logger   88 | 481dbe74-ec40-4956-b351-9fd93c91891e >>> (route1) bean://proc1 --> bean://proc2 <<< Pattern:InOnly, Headers:{CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFileNameOnly=test.txt, CamelFileAbsolute=false, CamelFileParent=aggregateTestSource, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileName=test.txt, CamelFileRelativePath=test.txt, CamelFilePath=aggregateTestSource\test.txt, CamelFileLength=21}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:52,049 | INFO  | ead 2: Multicast | Tracer                           | rg.apache.camel.processor.Logger   88 | c261530a-d450-4d87-868f-f0904533bc90 >>> (route1) bean://proc1 --> bean://proc2 <<< Pattern:InOnly, Headers:{CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileParent=aggregateTestSource, CamelFileAbsolute=false, CamelFileName=test.txt, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFileLength=21, CamelFilePath=aggregateTestSource\test.txt, CamelFileRelativePath=test.txt, CamelFileNameOnly=test.txt}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:52,128 | ERROR | 1: FileComponent | GenericFileOnCompletion          | rg.apache.camel.processor.Logger  248 |

              java.lang.IllegalMonitorStateException

                   at java.lang.Object.wait(Native Method)

                   at com.acm.route.Proc2.process(Route1.java:66)

                   at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:83)

                   at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:95)

                   at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:65)

                   at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:97)

                   at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:95)

                   at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:146)

                   at org.apache.camel.processor.SendProcessor.doProcess(SendProcessor.java:94)

                   at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:82)

                   at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

                   at org.apache.camel.processor.DelegateProcessor.processNext(DelegateProcessor.java:53)

                   at org.apache.camel.processor.DelegateProcessor.proceed(DelegateProcessor.java:82)

                   at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:162)

                   at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

                   at org.apache.camel.processor.RedeliveryErrorHandler.processExchange(RedeliveryErrorHandler.java:223)

                   at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:153)

                   at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:91)

                   at org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:49)

                   at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:206)

                   at org.apache.camel.processor.MulticastProcessor.doProcess(MulticastProcessor.java:241)

                   at org.apache.camel.processor.MulticastProcessor.access$000(MulticastProcessor.java:57)

                   at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:166)

                   at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:160)

                   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)

                   at java.util.concurrent.FutureTask.run(FutureTask.java:138)

                   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)

                   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)

                   at java.util.concurrent.FutureTask.run(FutureTask.java:138)

                   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)

                   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:207)

                   at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

                   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

                   at java.lang.Thread.run(Thread.java:619)

              12:01:52,143 | WARN  | 1: FileComponent | GenericFileOnCompletion          | ent.file.GenericFileOnCompletion  142 | Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@18bdf2a for file: GenericFile[test.txt]

              12:01:56,909 | INFO  | 1: FileComponent | Tracer                           | rg.apache.camel.processor.Logger   88 | 0455bed3-78d4-4765-81d0-33e65a07ab87 >>> (route1) from(file://aggregateTestSource?delay=5000) --> multicast <<< Pattern:InOnly, Headers:{CamelFilePath=aggregateTestSource\test.txt, CamelFileParent=aggregateTestSource, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileRelativePath=test.txt, CamelFileName=test.txt, CamelFileNameOnly=test.txt, CamelFileLength=21}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:56,909 | INFO  | ead 4: Multicast | Tracer                           | rg.apache.camel.processor.Logger   88 | e5c2b8d5-c279-449b-99b8-8f0f3a9f3ce8 >>> (route1) multicast --> bean://proc1 <<< Pattern:InOnly, Headers:{CamelFilePath=aggregateTestSource\test.txt, CamelFileParent=aggregateTestSource, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt, CamelFileRelativePath=test.txt, CamelFileNameOnly=test.txt, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileLength=21, CamelFileName=test.txt, CamelFileAbsolute=false}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:56,924 | INFO  | ead 5: Multicast | Tracer                           | rg.apache.camel.processor.Logger   88 | 17782fe4-f781-4e7b-a8c7-e7e1221f1439 >>> (route1) bean://proc1 --> bean://proc2 <<< Pattern:InOnly, Headers:{CamelFileRelativePath=test.txt, CamelFileNameOnly=test.txt, CamelFilePath=aggregateTestSource\test.txt, CamelFileLength=21, CamelFileParent=aggregateTestSource, CamelFileName=test.txt, CamelFileLastModified=Sun Jan 31 23:36:41 CST 2010, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\progress\SVN\acs\progress\apache-servicemix-4.1.0-psc-01-00RC1\aggregateTestSource\test.txt}, BodyType:org.apache.camel.component.file.GenericFile, Body:asdfasdfasdfasdfa
              \

              12:01:56,924 | ERROR | 1: FileComponent | GenericFileOnCompletion          | rg.apache.camel.processor.Logger  248 |

              java.lang.IllegalMonitorStateException

                   at java.lang.Object.wait(Native Method)

                   at com.acm.route.Proc2.process(Route1.java:66)

                   at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:83)

                   at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:95)

                   at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:65)

                   at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:97)

                   at org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:95)

                   at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:146)

                   at org.apache.camel.processor.SendProcessor.doProcess(SendProcessor.java:94)

                   at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:82)

                   at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

                   at org.apache.camel.processor.DelegateProcessor.processNext(DelegateProcessor.java:53)

                   at org.apache.camel.processor.DelegateProcessor.proceed(DelegateProcessor.java:82)

                   at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:162)

                   at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)

                   at org.apache.camel.processor.RedeliveryErrorHandler.processExchange(RedeliveryErrorHandler.java:223)

                   at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:153)

                   at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:91)

                   at org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:49)

                   at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:206)

                   at org.apache.camel.processor.MulticastProcessor.doProcess(MulticastProcessor.java:241)

                   at org.apache.camel.processor.MulticastProcessor.access$000(MulticastProcessor.java:57)

                   at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:166)

                   at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:160)

                   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)

                   at java.util.concurrent.FutureTask.run(FutureTask.java:138)

                   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)

                   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)

                   at java.util.concurrent.FutureTask.run(FutureTask.java:138)

                   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)

                   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:207)

                   at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

                   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

                   at java.lang.Thread.run(Thread.java:619)

              12:01:56,940 | WARN  | 1: FileComponent | GenericFileOnCompletion          | ent.file.GenericFileOnCompletion  142 | Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@18bdf2a for file: GenericFile[test.txt]

              • 4. Re: multicast and aggregate
                Stan Lewis Novice

                It looks like the exception is coming from your bean's process method, what's going on at this line:

                 

                at com.acm.route.Proc2.process(Route1.java:66)

                • 5. Re: multicast and aggregate
                  Willem Jiang Master

                  Can you use Thread.sleep() instead this.wait() in your processor?

                  BTW,  you can also try to turn off the multicast's parallelProcessing.