5 Replies Latest reply on Apr 21, 2011 9:47 AM by xdevroey

    Using multi-threading in a splitter

    xdevroey

      Hello,

       

      I am currently trying to writte a route to split a big xml file, into smaller files according to a groupId in the xml.

       

      When I don't use the .parallelProcessing() option of the splitter, everything is fine. But when I try to use multi-threading, it blocks and I get java heap space exceptions.

       

      Here is the route (the attached snippet contains the formatted following code):

      -


      ThreadPoolExecutor splittingPool = new ThreadPoolExecutor(

             1, 5, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10));

      splittingPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

       

      from("file:D:/test/in?noop=true")

      .log("Begin splitting")

      .split(body().tokenize("</elem>"))

              .parallelProcessing()

              .executorService(splittingPool)

              .streaming()

          .convertBodyTo(String.class)

          .process(trxSplittingProc)

          .choice()

              .when(header("groupId").isNotEqualTo("EOF"))

                  .to("file:D:/test/out?fileName=${header.groupId}-D-${header.startProcessingTime}.txt&fileExist=Append")

              .otherwise()

                  .log("End of file reached")

          .end()

      .end()

      .bean(trxSplittingProc, "reinitilize");

      -


      The header.groupId and header.startProcessingTime are set by the trxSplittingProc Processor. This processor only add the closing </elem> tag, removed by the splitting, to the body.

       

      The file I have to precess is pretty big (3.5Go) and contains 1.000.000 <elem>s (one element is approximatively 3500 char long).

       

      I have tried to configure the splittingPool to only have one thread and one element in the LinkedBlockingQueue, but the problem remains. Which is, in my opinion, weird since the splitting without multi-threading just work fines.

       

      Does anyone have an idea to solve the problem ? Many thanks.

       

      Xavier.

       

      Edit:

      I use Java 1.6 and Camel 2.2.0-fuse-02-00

       

      Edited by: xdevroey on Apr 19, 2011 11:23 AM

        • 1. Re: Using multi-threading in a splitter
          davsclaus

          You should enable the .streaming() option.

          http://camel.apache.org/splitter

           

          If you have the Camel in Action book, then chapter 10 covers this in much more details.

          • 2. Re: Using multi-threading in a splitter
            davsclaus

            Ah just noticed streaming is only available in a later Camel release.

            • 3. Re: Using multi-threading in a splitter
              davsclaus

              I thought it was weird if streaming was introduced later. So I double checked and discovered the mistake. The documentation was wrong, streaming is available in the Camel version you use.

              • 4. Re: Using multi-threading in a splitter
                xdevroey

                Hello Claus,

                 

                Thanks for your responses !

                 

                I think I use the streaming option, but maybe wrong (?) :

                   .split(body().tokenize("</elem>"))

                                    .parallelProcessing()

                                    .executorService(splittingPool)

                                    .streaming()

                 

                I have found this entry in the Camel bug reporting tool : https://issues.apache.org/jira/browse/CAMEL-3497

                 

                I guess the overflows come from there. Unfortunately I am stuck with Camel 2.2, I will try to bypass the difficulty.

                 

                Kind regards,

                • 5. Re: Using multi-threading in a splitter
                  xdevroey

                  Ok, I have found somthing else to have the desired behaviour.

                   

                  I used a classical XML SAX parser which sends the readed information to a seda endpoint using a ProducerTemplate and the @Produce(uri = "seda:out") annotation.

                   

                  I also add the following catch to simulate a poll(long, Timeunit) call on the seda endpoint:

                   

                  } catch(RuntimeCamelException ex){

                                              if(ex.getCause().getClass().equals(IllegalStateException.class)){

                                                  logger.debug("SEDA Queue full (?), will try again",ex);

                                                  Thread.sleep(DEFAULT_SEDA_WAIT);

                                              } else {

                                                  throw ex;

                                              }

                                          }

                   

                  Thanks for the answers.