8 Replies Latest reply on Jul 2, 2009 7:41 AM by italost

    Aggregator: messages belonging to a closed aggregate

    italost

      How can I avoid messages belonging to a closed aggregate to create a new aggregate?

       

      I have a situation where the aggregator is configured with a time out. I want to publish the messages already in the aggregate after an while and ignore the others that came after the aggregate has been closed.

       

       

      Regards.

       

      Italo Stefani

      Vetta Technologies

        • 1. Re: Aggregator: messages belonging to a closed aggregate
          davsclaus

          By close I suspect you mean that after the very first aggregation completes?

           

          So what you are looking for is to aggregate some messages and only send 1 out message. And all messages being aggregated afterwards should be disregarded?

           

          You can use a filter to stop sending messages to the aggregator after the 1 message have been aggregated

          http://camel.apache.org/message-filter.html

           

           

          Maybe something like this:

           

          // a boolean as flag for done

          final Boolean done = Boolean.FALSE;

           

          // a predicate to return the flag

          Predicate aggregatorDone = new Predicate() {

            ...

            return done;

          }

           

          // use a filter to prevent messages being aggregate when we are closed

          filter(aggregateDone)

          // and here is the aggregator

          .aggregate(...)

            // use a processor to switch the flag to done

          .processor(new Processor()) {

            ...

            done.setValue(Boolean.TRUE);

          }

          • 2. Re: Aggregator: messages belonging to a closed aggregate
            italost

            Hi Calus,

             

            actually, my scenario has a bunch of incoming messages and I want to aggregate them with a time out. Let say I aggreagte it by a message header  "fool". Also, let say that after the time out I have 4 messages aggregated where "fool=blah" . So, these messages are merged and the aggregator publishes the result message.

             

            After that, more incoming message with "fool=blah" reach the aggregator. In this case, thoose are late messages and I don't want to process them. According to the EIP book (pg 245), the Aggregator should keep information about what have been already processed. I was wondering if the Mediation Router Aggregator had a configuration that allow me to ignore late messages.

             

            Regards.

             

            Italo Stefani

            Vetta Technologies

            www.vettatech.com

            • 3. Re: Aggregator: messages belonging to a closed aggregate
              davsclaus

              Hi

               

              Ah thanks for the detailed explanation.

               

              The Aggregator is btw scheduled for a overhaul in Camel 2.1 to let it bit a bit easier to use out of the box. Anyway back to your use case.

               

              It currently does not hold such state. So when a batch have been sent out its forgotten in the aggregator.

               

              What is needed is to add such a state to it so Camel can decorate your exchanges with this information on the line with

               

                  java.lang.String AGGREGATED_INDEX = "CamelAggregatedIndex";

                  java.lang.String AGGREGATED_SIZE = "CamelAggregatedSize";

               

              Mind those constants are Camel 2.0. They exists also in 1.x but have different key.

               

              I see several solutions

               

              a)

              What you can do yourself is to store this information in a processor and then decorate the exchange with header marking it as LATE. And then use the filter to filter them out.

               

              b)

              Use the idempotent consumer EIP to skip duplicated messages where you can put a header on the message which correlation group it belongs. For instance you can add that in your custom aggregation strategy in the aggregator.

               

              In your aggregation strategy

              setHeader("MyGroupId", ...);

               

               

              And then use that header in the idempotent consumer

              http://camel.apache.org/idempotent-consumer.html

               

               

              I will create a ticket in Camel JIRA so the aggregator itself can keep track of this also and thus disregard late messages.

              • 4. Re: Aggregator: messages belonging to a closed aggregate
                davsclaus
                • 5. Re: Aggregator: messages belonging to a closed aggregate
                  italost

                  Hi Claus,

                  thanks for your answer. I will follow the A alternative :p.

                   

                  As soon as I succeed I post the solution here.

                   

                  Regards.

                   

                  Italo Stefani

                  Vetta Technologies

                  www.vettatech.com

                  • 6. Re: Aggregator: messages belonging to a closed aggregate
                    italost

                    I created the following solution for late messages.

                     

                    -


                    from("direct:aggregator")

                    .aggregator(header("fool"), new MyAggregator()).batchTimeout(2000L)

                    .idempotentConsumer(header("fool"), new MyMessageIdRepository())

                    .to("somewhere");

                    -


                     

                    MyMessageIdRepository can bem implemented as following:

                     

                    -


                    class MyMessageIdRepository implements MessageIdRepository{

                      private static Map repository = new HashMap();

                      public boolean contains(String messageId) {

                        if(repository.containsKey(messageId))

                          return true;

                        else

                          repository.put(messageId, messageId);

                        return false;

                      }

                    }

                    -


                     

                     

                    My aggregation is by a header, and this header is preserved after the aggregation. So, if a late message arrives, it will generate a new aggregate with this header, which has already passed by MyMessageIdRepository.

                     

                    Regards.

                     

                    Italo Stefani

                    Vetta Technologies

                    www.vettatech.com

                    • 7. Re: Aggregator: messages belonging to a closed aggregate
                      davsclaus

                      Cool you really started to get a hang of it how to combine several EIPs together.

                       

                      Your repository might need to store its state to a disk / database to be persistent and thus be able to survive a restart.

                       

                      Camel have a cheap file based idempotent repository and also a jpa based on camel-jpa.

                      • 8. Re: Aggregator: messages belonging to a closed aggregate
                        italost

                        Hi Claus.

                         

                        Thank you very much for your attention.

                         

                        No doubt we need to persist the repository. I just used a map to simply ilustrate the solution.

                         

                        Regards.

                         

                        Italo Stefani

                        Vetta Technologies

                        www.vettatech.com