5 Replies Latest reply on Oct 25, 2010 11:54 AM by sanya.sokolov

    Correlation expression for Camel enricher

    sanya.sokolov

      Hi all!

      I am rather new in Camel and have some question:

      2 sources (JMS queues) with input and additional data. I want to enrich input data with additional data aggregating by headers.

       

      I want to do something like:

      from("jms:queue:A").pollEnrich(header("aggId"), "jms:queue:B", 1000L, new MyAggregationStrategy())

       

      Camel enricher does not support correlation expressions. Is there some workaround?

       

      thank you

      Alex

        • 1. Re: Correlation expression for Camel enricher
          davsclaus

          So you want when received a new message on A to poll all messages from B which has a specific id?

           

          You can do this using a bean and in the bean you "drain" the B queue for messages. You then need to use JMS message selectors to only poll the messages which matches the id. And since this is Java code you can merge the message in the java code directly.

          • 2. Re: Correlation expression for Camel enricher
            sanya.sokolov

            Thank you for reply.

             

            I configure my appContext:

             

                  

            and my bean:

                 public void process(@Body String body, @Header(value = "aggId") String selector) {

                      String s = consumer.receiveBodyNoWait("jms:queue:B?connectionFactory=#wrappedConnectionFactory&selector=aggId=" + selector, String.class);

                      System.out.println(body + " " + s);

                 }

             

                 public static void main(String...f) {

                      ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");

                      /* ProducerTemplate producer = camel.createProducerTemplate();

                      for (int i = 0; i < 1000; i++) {

                           producer.sendBodyAndHeader("jms:queue:A?connectionFactory=#wrappedConnectionFactory", "Message A " + i, "aggId", "agg" + i);

                           producer.sendBodyAndHeader("jms:queue:B?connectionFactory=#wrappedConnectionFactory", "Message B " + i, "aggId", "agg" + i);

                      } */

                 }

             

            1000 exchanges from A are drained but none of exchanges from queue B are selected. And I see 1000 consumers for B are created.

            Does consumerTemplate support selectors in url (camel version 2.4.0)?

             

            Edited by: sanya.sokolov on Oct 25, 2010 11:39 AM

            • 3. Re: Correlation expression for Camel enricher
              sanya.sokolov

              sorry. I forgot quotes for selector.

               

              But I found that not all messages were selected:

               

              Message A 362 Message B 362

              Message A 363 null

              Message A 364 Message B 364

               

              I still have one remained in B queue. Do I need to enlarge awaiting time?

              Thank you.

               

              Edited by: sanya.sokolov on Oct 25, 2010 12:40 PM

              • 4. Re: Correlation expression for Camel enricher
                davsclaus

                You should pre-load the messages on the queue, before you start the camel route.

                 

                And make sure the queues are persisted because there are no active consumers.

                And if its a test make sure the delete the amq data folder so you start from clean

                • 5. Re: Correlation expression for Camel enricher
                  sanya.sokolov

                  I use TIBCO factory and 2 pre-loaded queues. Using text messages I have no issues but after I put identical objects into queues I do not get at least half of messages from B not depending on waiting time.