8 Replies Latest reply on May 30, 2012 4:35 AM by Emeka Kanu

    Aggregation (Enrichment & Multicasting)

    Emeka Kanu Newbie

      Mediator Router Version - 2.8.0-fuse-05-03

       

      Hello,

       

      I really need help on this please. Please forgive me if

       

      (a) This has already been covered somewhere else. I have not been able to find relevant information on searches

      (b) This issue has been posted incorrectly or poorly explained. I will try as much as possible to provide more clarity if any questions are asked.

       

      More details:

       

      (1) All interactions are done via either SOAP (with simple HTTP) or RESTful services (with Camel Restlet and Simple HTTP).

      (2) All data is sent/received as XML string

      (3) All data is transformed via XSLT and/or XPath

       

       

      Route details (Edited)

      -


      //Define namespaces to use

       

      Namespaces ns = new Namespaces("pri1", "http://www.some.namespace/1.0");

      ns.add("pri2", "..........");

       

      //Get from http endpoint

       

      from("jetty:http://url")

      .convertBodyTo(String.class)

      .to("log:com.ecurricular.esb?level=INFO")

       

      // Add some elements to the Header from payload

      .setHeader("initialRequestId", XPathBuilder.xpath("//pri1:requestId", String.class).namespaces(ns))

      .......

       

      // Log the request and show the header to see that values added exists

      .to("log:com.ecurricular.esb?level=INFO&showHeaders=true")

       

       

      /*Enrich */

      .enrich("direct:getResourceFromEnrichment", enrichmentAggStrategy)

       

      // Log after enrichment and show headers

      .to("log:com.ecurricular.esb?level=INFO&showHeaders=true")

       

      .multicast(new MulticastAggregationStrategy())

      .to("direct:syncService1", "direct:syncService2").end()

       

      // Log after multicast and show headers

      .to("log:com.ecurricular.esb?level=INFO&showHeaders=true")

       

      .to("restlet:http://localhost:8080/a-new-system/")

      .convertBodyTo(String.class)

       

      // Log after Restlet service call

      .to("log:com.ecurricular.esb?level=INFO&showHeaders=true")

       

      .....

      // End of Main route

      ;

      -


       

      -


      // Enrichment Route

      from("direct:getResourceFromEnrichment")

       

      .....

      .to("xslt:com..someFile.xslt")

      .to("http://localhost:8080/soapServiceCall?bridgeEndpoint=true")

      .....

      // End of Enrichment

      ;

      -


       

       

       

      -


      // Sync Service 1 Route

      from("direct:syncService1")

      .to("xslt:com/service1File.xslt")

      .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http.HttpMethods.POST))

      .to("http://localhost:8080/someRestService/main?bridgeEndpoint=true")

      ....

      // End of Service 1 Route

      ;

      -


       

       

       

      -


      // Sync Service 2 Route

      from("direct:syncService2")

      .to("xslt:com/service2File.xslt")

      .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http.HttpMethods.POST))

      .to("http://localhost:8080/someSOAPService/main?bridgeEndpoint=true")

      ....

      // End of Service 2 Route

      -


       

       

       

      -


      // Enrichment Aggregator method

      class EnrichmentAggStrategy implements AggregationStrategy{

           @Override

                public Exchange aggregate(Exchange original, Exchange resource) {

                     

                     //The original message. We don't need to do anything to it

                     Object originalBody = original.getIn().getBody();

                     

                     String enrichmentServiceResponse= (String) resource.getIn().getBody();

                     

                     String someRequiredValue = extractValueWithXPath(enrichmentServiceResponse);

                     

                     original.getIn().setHeader("someRequiredValue",someRequiredValue);

                     

                     return original;

                }

      }

      -


       

       

      -


      public class MulticastAggregationStrategy implements AggregationStrategy{

                

                @Override

                public Exchange aggregate(Exchange oldExchange, Exchange resource) {

       

                     boolean service1ItemReceived = false;

                     boolean service2ItemReceived = false;

       

                     Exchange theExchange = null;

                     

                     if(oldExchange == null)

                     {

                          return resource;

                     }

                     

                     else

                          theExchange = oldExchange;

                      

                     // XML String

                     String oldBody = (String) oldExchange.getIn().getBody();   

                      

                     // XML String

                     String resourceBody = (String) resource.getIn().getBody();

                     

      // Please Note. Beyond this point, I was just trying anything and I could not be sure

      // whether these values should be set in the In or the Out, so I set in both

       

                     if(oldBody!=null && oldBody.contains("Service1Response"))

                     {

                          String someService1Id = getService1Id(oldBody); // Done on XML via xpath

                          theExchange.getIn().setHeader("someService1Id",someService1Id);

                          theExchange.getOut().setHeader("someService1Id",someService1Id);

                           service1ItemReceived = true;

                     }

       

                     else if(resourceBody!=null && resourceBody.contains("Service1Response"))

                     {

                          String someService1Id = getService1Id(resourceBody); // Done on XML via xpath

                          theExchange.getIn().setHeader("someService1Id",someService1Id);

                          theExchange.getOut().setHeader("someService1Id",someService1Id);

                          service1ItemReceived = true;

                     }

                      

                     if(resourceBody!=null && resourceBody.contains("Service2Response"))

                     {

                          String someService2Id = getService2Id(resourceBody); // Done on XML via xpath

                          theExchange.getIn().setHeader("someService2Id",someService2Id);

                          theExchange.getOut().setHeader("someService2Id",someService2Id);

                          service2ItemReceived = true;

                     }

                     else if(oldBody!=null && oldBody.contains("Service2Response"))

                     {

                          String someService2Id = getService2Id(oldBody); // Done on XML via xpath

                          theExchange.getIn().setHeader("someService2Id",someService2Id);

                          theExchange.getOut().setHeader("someService2Id",someService2Id);

                          service2ItemReceived = true;

                     }

       

                     .......

       

                     return theExchange;

                }

       

      -


       

      END OF CODE - Thank you for staying with me

       

      Observations which are causing problems

      (1) - The log (At comment '// Log the request and show the header to see that values added exists') shows my header values from the request XML in the header. GOOD.

      (2) - The log immediately after the enrichment shows the value added by the enricher, but the previous values seen previously (point 1) have been lost (BAD)

      (3) - The log after multicasting shows the values seen previously (at points 1 and 2) have now been lost, with the values added by this new aggregation strategy seen successfully. (GOOD and BAD)

      (4) - The log after the restlet service call shows all previously seen header values are now lost (VERY VERY BAD).

      (5) - As a result of all observations shown above, further processing beyond what is shown in this route (which needed values set in the headers) cannot be carried out successfully

       

      Please forgive the long and complex problem, but I found the trivial examples on the web (including the Loan Broker) frankly, too simplistic and annoying.

       

      Any help/pointers are greatly appreciated.

       

      Thanks in Advance

       

      Emeka Kanu