8 Replies Latest reply on May 30, 2012 4:35 AM by emeka_kanu

    Aggregation (Enrichment & Multicasting)

    emeka_kanu

      Mediator Router Version - 2.8.0-fuse-05-03

       

      Hello,

       

      I really need help on this please. Please forgive me if

       

      (a) This has already been covered somewhere else. I have not been able to find relevant information on searches

      (b) This issue has been posted incorrectly or poorly explained. I will try as much as possible to provide more clarity if any questions are asked.

       

      More details:

       

      (1) All interactions are done via either SOAP (with simple HTTP) or RESTful services (with Camel Restlet and Simple HTTP).

      (2) All data is sent/received as XML string

      (3) All data is transformed via XSLT and/or XPath

       

       

      Route details (Edited)

      -


      //Define namespaces to use

       

      Namespaces ns = new Namespaces("pri1", "http://www.some.namespace/1.0");

      ns.add("pri2", "..........");

       

      //Get from http endpoint

       

      from("jetty:http://url")

      .convertBodyTo(String.class)

      .to("log:com.ecurricular.esb?level=INFO")

       

      // Add some elements to the Header from payload

      .setHeader("initialRequestId", XPathBuilder.xpath("//pri1:requestId", String.class).namespaces(ns))

      .......

       

      // Log the request and show the header to see that values added exists

      .to("log:com.ecurricular.esb?level=INFO&showHeaders=true")

       

       

      /*Enrich */

      .enrich("direct:getResourceFromEnrichment", enrichmentAggStrategy)

       

      // Log after enrichment and show headers

      .to("log:com.ecurricular.esb?level=INFO&showHeaders=true")

       

      .multicast(new MulticastAggregationStrategy())

      .to("direct:syncService1", "direct:syncService2").end()

       

      // Log after multicast and show headers

      .to("log:com.ecurricular.esb?level=INFO&showHeaders=true")

       

      .to("restlet:http://localhost:8080/a-new-system/")

      .convertBodyTo(String.class)

       

      // Log after Restlet service call

      .to("log:com.ecurricular.esb?level=INFO&showHeaders=true")

       

      .....

      // End of Main route

      ;

      -


       

      -


      // Enrichment Route

      from("direct:getResourceFromEnrichment")

       

      .....

      .to("xslt:com..someFile.xslt")

      .to("http://localhost:8080/soapServiceCall?bridgeEndpoint=true")

      .....

      // End of Enrichment

      ;

      -


       

       

       

      -


      // Sync Service 1 Route

      from("direct:syncService1")

      .to("xslt:com/service1File.xslt")

      .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http.HttpMethods.POST))

      .to("http://localhost:8080/someRestService/main?bridgeEndpoint=true")

      ....

      // End of Service 1 Route

      ;

      -


       

       

       

      -


      // Sync Service 2 Route

      from("direct:syncService2")

      .to("xslt:com/service2File.xslt")

      .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http.HttpMethods.POST))

      .to("http://localhost:8080/someSOAPService/main?bridgeEndpoint=true")

      ....

      // End of Service 2 Route

      -


       

       

       

      -


      // Enrichment Aggregator method

      class EnrichmentAggStrategy implements AggregationStrategy{

           @Override

                public Exchange aggregate(Exchange original, Exchange resource) {

                     

                     //The original message. We don't need to do anything to it

                     Object originalBody = original.getIn().getBody();

                     

                     String enrichmentServiceResponse= (String) resource.getIn().getBody();

                     

                     String someRequiredValue = extractValueWithXPath(enrichmentServiceResponse);

                     

                     original.getIn().setHeader("someRequiredValue",someRequiredValue);

                     

                     return original;

                }

      }

      -


       

       

      -


      public class MulticastAggregationStrategy implements AggregationStrategy{

                

                @Override

                public Exchange aggregate(Exchange oldExchange, Exchange resource) {

       

                     boolean service1ItemReceived = false;

                     boolean service2ItemReceived = false;

       

                     Exchange theExchange = null;

                     

                     if(oldExchange == null)

                     {

                          return resource;

                     }

                     

                     else

                          theExchange = oldExchange;

                      

                     // XML String

                     String oldBody = (String) oldExchange.getIn().getBody();   

                      

                     // XML String

                     String resourceBody = (String) resource.getIn().getBody();

                     

      // Please Note. Beyond this point, I was just trying anything and I could not be sure

      // whether these values should be set in the In or the Out, so I set in both

       

                     if(oldBody!=null && oldBody.contains("Service1Response"))

                     {

                          String someService1Id = getService1Id(oldBody); // Done on XML via xpath

                          theExchange.getIn().setHeader("someService1Id",someService1Id);

                          theExchange.getOut().setHeader("someService1Id",someService1Id);

                           service1ItemReceived = true;

                     }

       

                     else if(resourceBody!=null && resourceBody.contains("Service1Response"))

                     {

                          String someService1Id = getService1Id(resourceBody); // Done on XML via xpath

                          theExchange.getIn().setHeader("someService1Id",someService1Id);

                          theExchange.getOut().setHeader("someService1Id",someService1Id);

                          service1ItemReceived = true;

                     }

                      

                     if(resourceBody!=null && resourceBody.contains("Service2Response"))

                     {

                          String someService2Id = getService2Id(resourceBody); // Done on XML via xpath

                          theExchange.getIn().setHeader("someService2Id",someService2Id);

                          theExchange.getOut().setHeader("someService2Id",someService2Id);

                          service2ItemReceived = true;

                     }

                     else if(oldBody!=null && oldBody.contains("Service2Response"))

                     {

                          String someService2Id = getService2Id(oldBody); // Done on XML via xpath

                          theExchange.getIn().setHeader("someService2Id",someService2Id);

                          theExchange.getOut().setHeader("someService2Id",someService2Id);

                          service2ItemReceived = true;

                     }

       

                     .......

       

                     return theExchange;

                }

       

      -


       

      END OF CODE - Thank you for staying with me

       

      Observations which are causing problems

      (1) - The log (At comment '// Log the request and show the header to see that values added exists') shows my header values from the request XML in the header. GOOD.

      (2) - The log immediately after the enrichment shows the value added by the enricher, but the previous values seen previously (point 1) have been lost (BAD)

      (3) - The log after multicasting shows the values seen previously (at points 1 and 2) have now been lost, with the values added by this new aggregation strategy seen successfully. (GOOD and BAD)

      (4) - The log after the restlet service call shows all previously seen header values are now lost (VERY VERY BAD).

      (5) - As a result of all observations shown above, further processing beyond what is shown in this route (which needed values set in the headers) cannot be carried out successfully

       

      Please forgive the long and complex problem, but I found the trivial examples on the web (including the Loan Broker) frankly, too simplistic and annoying.

       

      Any help/pointers are greatly appreciated.

       

      Thanks in Advance

       

      Emeka Kanu

        • 1. Re: Aggregation (Enrichment & Multicasting)
          davsclaus

          When you have a "header lost" situation and are using http/jetty components the data is streamed. So you either need to convert the payload to a String (in memory), or use stream caching.

          http://camel.apache.org/stream-caching.html

          • 2. Re: Aggregation (Enrichment & Multicasting)
            emeka_kanu

            Hello davsclaus,

             

            Thanks for your response. I'm currently converting the payload to a string . Is this not enough?

             

            Do you still recommend I set the streamCache?

            • 3. Re: Aggregation (Enrichment & Multicasting)
              davsclaus

              In the 2 direct sync service routes, you use the http endpoint. The response from this is stream based. So you may have issue here as well.

              • 4. Re: Aggregation (Enrichment & Multicasting)
                emeka_kanu

                Hello davsclaus,

                 

                I think your answer is close. Now I see my data after the enrichment.

                 

                However, I still lose my header properties after the multicast and the data retrieved by multicast after the restlet call.

                 

                I set streamCache to true in the camelContext

                ....

                <camel:camelContext streamCache="true">

                ....

                 

                and on receipt of a request in the routes

                from("jetty:http://url").streamCaching()

                 

                I see from the link you've sent me that stream caching is enabled by default in multicasts. Not sure this is what my problem is.

                 

                Thank you anyway.

                • 5. Re: Aggregation (Enrichment & Multicasting)
                  davsclaus

                  You can use the tracer to see where the headers is gone.

                  http://camel.apache.org/tracer

                   

                  Mind that when you do request/reply over a transport, then the headers may not be returned by that transport. eg as headers is part of the message protocol.

                   

                  So storing data in headers is not 100% to be preserved as transports may not return them/drop them. eg the JMS spec has limitation what kind of headers can be used etc.

                   

                  And then double check your multicast aggregation strategy code, to see if you drop your headers there.

                   

                  And stream caching is NOT implicit enabled by multicast. That was in Camel 1.x.

                  Enabling in on  would mean its globally enabled

                  http://camel.apache.org/stream-caching.html

                  • 6. Re: Aggregation (Enrichment & Multicasting)
                    emeka_kanu

                    Hello,

                     

                    I have managed to solve this problem. Thank you.

                     

                    This is what I had to do.

                     

                    Firstly get rid of my own Multicast Aggregation Strategy and use GroupedExchangeAggregationStrategy instead.

                     

                    Eg;

                    .multicast(new GroupedExchangeAggregationStrategy())

                    *          .to("direct:service1", "direct:service2").end()*

                    *          .setBody(property(Exchange.GROUPED_EXCHANGE))*

                     

                    I then did my aggregation of headers and other properties in a Processor implementation within the main route.

                     

                    Secondly, I replaced the restlet component with a simple http call. I had initially had a problem with this because I could not figure out how to use a URI with curly braces. However, I read the documentation again on HTTP and saw this.

                     

                    from("direct:start")

                      .setHeader(Exchange.HTTP_URI, simple("http://myserver/orders/${header.orderId}"))

                      .to("http://dummyhost");

                     

                    I think the restlet component should be changed to hold and return the exchange headers, especially as in this case, it was being used in a synchronously process. I cannot see why these should be lost, especially as it works in http, but that is not my call.

                     

                    Edited by: emeka_kanu on May 30, 2012 8:34 AM

                    • 7. Re: Aggregation (Enrichment & Multicasting)
                      emeka_kanu

                      Please see the comments I have added in the thread.

                      • 8. Re: Aggregation (Enrichment & Multicasting)
                        davsclaus

                        Hi

                         

                        I have logged a ticket about the restlet producer

                        http://fusesource.com/issues/browse/MR-623