6 Replies Latest reply on Apr 20, 2009 7:26 AM by davsclaus

    Aggregation issue: Sum of amounts from multiple messages

    roger

      Hi all,

       

      I have an issue with the Aggregation functionality in MR/Camel (1.5.4). The thing I try to achieve is (I think) quite simple:

       

      I have XML Schema-formatted messages containing an amount in a field, call it Amount. A typical message looks like this:

       

      Aggregates into Message 4.Amount = 25

      and so on, I think you get the picture.

       

      I try to do this using the following DSL:

       

      from("activemq:MyQueue")

      .aggregator(new AmountAggregation())

      .xpath("name(/p:Message)='Message'", String.class, nsMap)

      .batchTimeout(500L)

      .to("file:data/agg-out?append=false&expression=sum.xml");

       

      The AmountAggregation class contains an AggregationStrategy.aggregate method which adds oldExchange.getIn().getBody()... message into the newExchange.

       

      Now, the problem: IT DOESN'T WORK. Why? I seem get TWO calls for each message to my AmountAggregation instance, one which satisfies the XPath expression (i.e. has the Message root tag, and evaluates to true) and one which DOESN'T, i.e. evaluates to false.

       

      Does anyone have a clue what I'm doing wrong, or did I misunderstand (most probably) anything about the aggregation functionality?

       

      Regards,

      Roger

        • 1. Re: Aggregation issue: Sum of amounts from multiple messages
          davsclaus

          Ahh the XPath. Yeah it can be a pain to get correct. We recently added some more DEBUG that can help. It may already be in 1.5.4 or it will be in 1.6.0 when its released in short time.

           

          Instead of xpath and if all your messages on the JMS queue are the same you can try with a constant expression instead: eg .constant(true)

           

          Can you post your aggreation strategy class?

          • 2. Re: Aggregation issue: Sum of amounts from multiple messages
            roger

            Hello again,

             

            I tried to change from XPath into aggregating upon a header property, i.e. setting property ID to "X":

             

            from...

            .process(new Processor() {

              public void process(Exchange ex) {

                ex.getIn().setHeader("ID", "X");

              }

            }

            ...

             

            Then I aggregate upon that:

             

            from...

            .aggregate(header("ID"), myAggregationStrategy)

            .batchTimeout(500L)

            .to(...);

             

            This however doesn't work either, and I guess I now know why: When the timeout (0,5 seconds) has expired, the aggregation buffer gets emptied (I can see that the map field in org.apache.camel.processor.aggregate.DefaultAggregationCollection gets emptied while debugging its 'add' method (line 54)).

             

            The problem, as I now see it, is that my aggregation never should 'expire'. I want it to recalculate whenever a new message arrives, and add it upon the previous sum.

             

            Am I trying to do something not intended with Camel aggregation, or do you have any hints?

             

            Thanks and regards,

            Roger

            • 3. Re: Aggregation issue: Sum of amounts from multiple messages
              davsclaus

              Ah

               

              So what you want is to keep a current sum as a kind of barrier.

              When a new message arrives you calculate a new current sum and send 1 message out with this total.

               

              Then after some time a new message arrives and you recalculate again and send a new message out with the new total.

               

              And you keep doing this forever?

               

              Yeah I can see that this is not really what the aggregator EIP pattern was designed for. In the current codebase it doesn't have keep state when its complete so you loose the "current sum".

               

              What you might need is to store this state in a persistent store and then use a regular processor to recalculate a new total and update the persistent store.

               

              from("jms") -> processor or bean -> to ( somewhere else )

               

              and in the processor or bean you can fetch the sum from the persistent store, re calculate and update the store and set the body with the new sum.

               

              For starters you can just keep it as a variable in your processor / bean to get the rote working. Later you can add persistent store.

               

              Notice that the current aggregator in Camel is not backed by a persistent store either. So if you stop/restart it starts from 0. We have a ticket to enhance this in the future.

              • 4. Re: Aggregation issue: Sum of amounts from multiple messages
                roger

                Hello again,

                 

                Thanks a lot for your quick replies!

                 

                And yes, you're right -- the aggregator pattern simply is not designed for doing this. What you suggest as I understood is to something like:

                 

                from...

                .process(new Processor() {

                  public void process(Exchange ex) {

                    //read from ex.getIn().getBody() and calculate

                    //output to ex.getOut()...

                  }

                }

                .to(...);

                 

                Do you have any suggestions on what persistent store one generally should use in a standard Camel deployment? And yes, in-memory singleton (ouch?!) variable will do it for the moment, but for the future I would like persist it.

                 

                Thanks anyway!

                 

                Regards,

                Roger

                • 5. Re: Aggregation issue: Sum of amounts from multiple messages
                  davsclaus

                  Hi Roger

                   

                  For the persistence yeah then its kinda like ala carte. The normal is to use either file or database.

                   

                  The quickets is probably file based. But you should also check within your organization what requirements you have for storing "application data".

                   

                  Some organizations have a configuration database where you can can add your parameters. But its really a matter case by case or organization by organization.

                   

                  If the information to be stored is not sensitive then the choice is usually a bit broader.

                   

                  For instance if its file based and sensitive you might need to encrypt it, or store it in locations where on a few people have access to. And maybe it need an AUDIT log to see whom have read/updated it etc. But in you case as its "just a number" I dont think this is the case

                   

                  In Camel there a components for both file and database you can choose

                  - file is part of camel-core

                  - jpa is the standard JPA for database

                  - jdbc is using spring JDBC template

                  - sql is using regular java SQL

                  - ibatis is using iBatis

                   

                  What might is lacking is a properties file support in Camel.

                   

                  For instance you can use a properties file and store the number there. And have it loaded / injected by spring properties placeholders when you application startup.

                   

                  Then what is needed is to update the properties file with the new number.

                   

                  In your processor code you can just only update the amount (if its a singleton processor). And you can leverage the camel ProducerTemplate to send the file content to override the properties file

                   

                  Something like this:

                  template.sendBodyAndHeader("file://myfolder/", amount, FileComponent.FILE_NAME, "amount.properties");

                  • 6. Re: Aggregation issue: Sum of amounts from multiple messages
                    davsclaus

                    And remember to use append=false as parameter to the file endpoint so it knows it should replace the file.