11 Replies Latest reply on Dec 16, 2011 2:19 AM by marks1900_marks1900-customer

    Streaming mode with custom splitter

    dekk11

      I have written a customer split method to process an incoming message body then split the payload into fixed sized byte arrays. In attempt to process large incoming files, I implemented the streaming option of the Splitter EIP which did not work with my custom splitter.

       

      DSL Ex:

      from("incomingUri").split().method("mySplitterProcessor","splitBodyInBytes").streaming()

       

      Spring Ex:

       

          <split streaming="true">

             

       

      Are you aware of any examples of using streaming mode with a custom split method within the Splitter EIP? Any additional information or direction would be very useful and appreciative.

       

      Thanks

      Steve

       

      Edited by: dekk11 on Aug 16, 2010 7:02 PM

       

      Edited by: dekk11 on Aug 16, 2010 7:02 PM

        • 1. Re: Streaming mode with custom splitter
          davsclaus

          What do you return from your custom split method?

           

          It should be an Iterator, to work well with the streaming mode.

          • 2. Re: Streaming mode with custom splitter
            dekk11

            Yes I am returning a List?  I need to confirm that large incoming files are not entirely read into memory, but streamed to be read in a part at a time.

            • 3. Re: Streaming mode with custom splitter
              davsclaus

              Well if you return a List which then returns the parts on-the-fly.

              • 4. Re: Streaming mode with custom splitter
                dekk11

                In Section 8.1.3-"Splitting big messages", it briefly discusses using streaming with the splitter in context of the tokenizer, which only processes strings. It states that tokenizer uses java.util.Scanner to read chunks of data into memory.  So the Scanner reads a "stream" of data, based on a token.  The Scanner object is then passed back to Camel as an Expression, which Camel then uses to iterate over and deliver as parts.  Is this correct?

                 

                I am in need of a similar implementation but for binary streams.  Instead of getting the message body, i.e. splitBody(@Body byte[] body), I need to get the Exchange, i.e. splitBody(Exchange exchange).  From the Exchange I could get the file and perform custom streaming to then create my Iterator.  Does this sound feasible.  How do I get the Exchange passed to the custom split method? 

                 

                Do you know of any implementations such as this?  Or would you recommend another approach of reading large binary files?  Does the Stream component (ex.  ) provide any value here to be able to stream the file that I then can read as streaming input?

                 

                thx again!

                • 5. Re: Streaming mode with custom splitter
                  davsclaus

                  If you use a custom Iterator to read the file piece by piece then that works well with the streaming mode of the Camel Splitter EIP.

                   

                  This is similar to as how the Scanner works. If you just want to split your file by a special token etc then the Scanner is a great choice. And Camel DSL have syntax sugar for this using the tokenizer.

                   

                  If you need something more advanced/custom, then just use a custom Iterator. For example you can read in X number of bytes at each iteration step.

                  • 6. Re: Streaming mode with custom splitter
                    dekk11

                    Can you provide a simple example of how a custom split method would use an Iterator to get the file piece by piece?  What would the signature of split method be?

                    • 7. Re: Streaming mode with custom splitter
                      davsclaus

                      Just return Iterator as the return type of the method.

                       

                      Then implement the Iterator to read a chunk of the file piece by piece and return it in next() and the hasNext() method.

                       

                      I am sure you can google and find some examples.

                      • 8. Re: Streaming mode with custom splitter
                        dekk11

                        Thanks for your quick responses.  I still am unclear on how the split method can get the file object from the exchange, so that it can then be converted into Interator.  How do I get the File reference from within the split method?

                        • 9. Re: Streaming mode with custom splitter
                          davsclaus

                          Read chapter 4 in the Camel book which talks about using beans with Camel.

                           

                          public Iterator foo(File file) {
                             return new Iterator() {
                                 // implement me to return chunks of the file
                             };
                          }
                          

                           

                          • 10. Re: Streaming mode with custom splitter
                            dekk11

                            Another informative chapter in the book!  I did implement an Iterator that accesses a file via a fileinputstream and implements reading the file a chunk at a time.  I used the bean parameter binding access the exchange and it's members.  Thanks again for your assistance.

                             

                            Steve

                            • 11. Re: Streaming mode with custom splitter
                              marks1900_marks1900-customer

                              Would you be able to provide the sample code on this example?

                               

                               

                              On the following page:  http://camel.apache.org/splitter.html ,  there is an excerpt that I have questions about.

                               

                              ++ Excerpt - Start ++

                               

                              import static org.apache.camel.builder.ExpressionBuilder.beanExpression;

                              from("direct:streaming")

                                   .split(beanExpression(new MyCustomIteratorFactory(),  "iterator"))

                                   .streaming().to("activemq:my.parts")

                               

                              ++ Excerpt - End ++

                               

                              Can someone provided me with same code for the MyCustomIteratorFactory class?

                               

                              Basically I have one huge file, that I want to split into many jms messages.

                               

                              The following is an outline of my current thoughts...

                               

                                  public void configure() throws Exception

                                  {

                                      from( "file://file-item-list/general?initialDelay=1000&delay=5000&delete=true" )

                                          .transacted( "PROPAGATION_REQUIRED" )

                                          .split(ExpressionBuilder.beanExpression(new MyCustomFileChunkIterator(),  "iterator"))  // split big file of items into smaller chunks

                                          .streaming()

                                          .to("activemq:queue:file-item-sublist-request");

                                  }

                               

                              Edited by: marks1900 on Dec 16, 2011 7:18 AM