1 2 Previous Next 18 Replies Latest reply on Oct 11, 2016 12:07 PM by cfang

    ItemProcessor

    richardmoore

      I need to have the ItemProcessor start a connection and use that connection throughout the processing of all items and then close up the connection and etc. How would I implement this?

        • 1. Re: ItemProcessor
          cfang

          item processor does not have open() or close() method that can be used for resource initialization and clean-up, as with item reader and writer.

           

          However, you can add @PostConstruct and @PreDestroy methods to your item processor class for this purpose.  See examples:

           

          jsr352/CamelItemProcessor.java at master · jberet/jsr352 · GitHub

           

          jsr352/CamelListenerBase.java at master · jberet/jsr352 · GitHub

           

          javax.annotation.PostConstruct and javax.annotation.PreDestroy are standard Java class and so no extra dependency needed.

          • 2. Re: ItemProcessor
            richardmoore

            Seems the PreDestroy is getting called but not the PostConstruct. What am I missing?

             

              @PostConstruct

                private void postConstruct() {

              log.info("Creating database resources.");

              try {

              this.connection = this.getConnection();

              this.stmt = this.connection.prepareStatement("select * from db2pdba.aim_fclty where dw_fclty_ltr2_cd = ?");

              } catch (Exception e) {

              throw new RuntimeException(e);

              }

              }

             

              @PreDestroy

                private void preDestroy() {

              log.info("Closing database resources.");

              if (this.stmt != null) {

              try {

              this.stmt.close();

              } catch (SQLException e) {

              log.warning(e.getMessage());

              }

              }

              if (this.connection != null) {

              try {

              this.connection.close();

              } catch (SQLException e) {

              log.warning(e.getMessage());

              }

              }

              }

             

            2016-10-06 14:45:46.214 [jberet-1    ]                    CawaStepListener - [INFO ] Starting Jberet_Adhoc_TestCase3.step2

            2016-10-06 14:45:46.315 [jberet-1    ]  VariableSubstitutionJdbcItemReader - [INFO ] SQL after transient user data variable substitution:       SELECT fclty_cd, fclty_ltr2_cd, fclty_nm, dw_fclty_ltr2_cd       FROM db2pdba.aim_fclty       where dw_fclty_ltr2_cd in ('KC','SP','GC','NO')     

            2016-10-06 14:45:53.886 [jberet-1    ]   JberetAdhocTestCase3ItemProcessor - [INFO ] {FCLTY_CD=00, DW_FCLTY_LTR2_CD=KC, FCLTY_NM=KANSAS CITY, FCLTY_LTR2_CD=KC}

            2016-10-06 14:45:53.912 [jberet-1    ]                              jberet - [ERROR] ProcessingInfo{count=1, timerExpired=false, itemState=RUNNING, chunkState=RUNNING, checkpointPosition=-1, readPosition=0, failurePoint=null}

            2016-10-06 14:45:53.935 [jberet-1    ]                              jberet - [ERROR] item-count=10, time-limit=0, skip-limit=-1, skipCount=0, retry-limit=-1, retryCount=0

            2016-10-06 14:45:53.961 [jberet-1    ]                              jberet - [ERROR] JBERET000007: Failed to run job Jberet_Adhoc_TestCase3, step2, org.jberet.job.model.Chunk@29c3dbc

            java.lang.NullPointerException

              at com.awginc.jberet.examples.JberetAdhocTestCase3ItemProcessor.processItem(JberetAdhocTestCase3ItemProcessor.java:57) ~[bin/:?]

             

            Which is the line in my processItem(Object item) method that does -

            this.stmt.clearParameters();

             

            2016-10-06 14:45:54.023 [jberet-1    ]   JberetAdhocTestCase3ItemProcessor - [INFO ] Closing database resources.

            2016-10-06 14:45:54.066 [jberet-1    ]                             support - [INFO ] JBERET060502: Closing resource V:/JavaBatchFramework/app/data/tmp/XX/Jberet_Adhoc_TestCase3_20161006_144400.out in class org.jberet.support.io.JacksonCsvItemWriter

            2016-10-06 14:45:54.088 [jberet-1    ]                    CawaStepListener - [INFO ] Ending Jberet_Adhoc_TestCase3.step2 = FAILED

            2016-10-06 14:45:54.096 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: ROLLBACK_COUNT=1

            2016-10-06 14:45:54.104 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: PROCESS_SKIP_COUNT=0

            2016-10-06 14:45:54.112 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: WRITE_SKIP_COUNT=0

            2016-10-06 14:45:54.120 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: WRITE_COUNT=0

            2016-10-06 14:45:54.129 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: COMMIT_COUNT=0

            2016-10-06 14:45:54.137 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: FILTER_COUNT=0

            2016-10-06 14:45:54.145 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: READ_SKIP_COUNT=0

            2016-10-06 14:45:54.153 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: READ_COUNT=1

            • 3. Re: ItemProcessor
              cfang

              can you post the begining of your item processor class, including package line?  You can obfuscate them if needed.  I'd like to see how this class is declared.

               

              Also, do you use bean ref name (the name in @Named("name")), or use batch.xml to declare artifacts?  There may be slight difference how batch artifacts are created.

              • 4. Re: ItemProcessor
                richardmoore

                package com.awginc.jberet.examples;

                 

                import java.sql.Connection;

                import java.sql.PreparedStatement;

                import java.sql.ResultSet;

                import java.sql.SQLException;

                import java.util.Map;

                 

                import javax.annotation.PostConstruct;

                import javax.annotation.PreDestroy;

                import javax.inject.Named;

                 

                import com.awginc.batch.system.jdbc.batchlet.JdbcItemProcessor;

                 

                @Named

                public class JberetAdhocTestCase3ItemProcessor extends JdbcItemProcessor {

                 

                  @PostConstruct

                    private void postConstruct() {

                  log.info("Creating database resources.");

                  try {

                  this.connection = this.getConnection();

                  this.stmt = this.connection.prepareStatement("select * from db2pdba.aim_fclty where dw_fclty_ltr2_cd = ?");

                  } catch (Exception e) {

                  throw new RuntimeException(e);

                  }

                  }

                 

                  @PreDestroy

                    private void preDestroy() {

                  log.info("Closing database resources.");

                  if (this.stmt != null) {

                  try {

                  this.stmt.close();

                  } catch (SQLException e) {

                  log.warning(e.getMessage());

                  }

                  }

                  if (this.connection != null) {

                  try {

                  this.connection.close();

                  } catch (SQLException e) {

                  log.warning(e.getMessage());

                  }

                  }

                  }

                 

                  @Override

                  public Object processItem(Object item) throws Exception {

                  @SuppressWarnings("unchecked")

                  Map<Object, Object> map = (Map<Object, Object>) item;

                  map.put("FCLTY_NM", map.get("FCLTY_NM").toString().trim());

                  log.info(map.toString());

                 

                 

                  ResultSet rs = null;

                  try {

                  this.stmt.clearParameters();

                  this.stmt.setString(1, map.get("DW_FCLTY_LTR2_CD").toString());

                  rs = stmt.executeQuery();

                  while (rs.next()) {

                  this.display(rs);

                  }

                  } catch (SQLException e) {

                  throw e;

                  } finally {

                  if (rs != null) {

                  rs.close();

                  }

                  }

                 

                  return map;

                  }

                 

                  private Connection connection;

                  private PreparedStatement stmt;

                }

                • 5. Re: ItemProcessor
                  cfang

                  I tried a similar test but couldn't reproduce your problem.

                   

                  test-apps/postConstruct/ module contains some tests to verify @PostConstruct and @PreDestroy methods for a batchlet step.  I just added more test for chunk step (@PostConstruct and @PreDestroy methods for item reader, item processor and item writer).

                   

                  Can you create a reproducible test app?  You can create a Jira issue, and attach to the issue.

                  • 6. Re: ItemProcessor
                    richardmoore

                    It is out there.

                    • 7. Re: ItemProcessor
                      cfang
                             

                      <processor ref="com.awginc.jberet.examples.TestCase4ItemProcessor"/>

                       

                       

                      here you are using the fully qualified class name as the ref name for the item processor artifact.  This will bypass the CDI bean creation.  Can you change to

                       

                              <processor ref="testCase4ItemProcessor"/>

                       

                      which is the default CDI bean name (the short class name with the first letter de-capitalized)

                      • 8. Re: ItemProcessor
                        cfang
                        1. JBERET-270

                        Chunk ItemProcessor not executing postConstruct (stand-alone)

                         

                        @PostConstruct method should still work, even if the artifact has the ref name of fully-qualified class name.  I'll fix it in JBERET-270

                        • 9. Re: ItemProcessor
                          richardmoore

                          I pulled down the new core jar from this weekend - The Central Repository Search Engine

                          and I put my jsl back to the class name and I am still not getting the postConstruct to fire -

                          <chunk>

                            <reader ref="jdbcSubstitutionItemReader">

                            <properties>

                            <property name="dataSourceLookup" value="jdbc/db2"/>

                            <property name="sql" value="

                            SELECT fclty_cd, fclty_ltr2_cd, fclty_nm, dw_fclty_ltr2_cd

                            FROM db2pdba.aim_fclty

                            where dw_fclty_ltr2_cd in (${mylist})

                            "/>

                            <property name="beanType" value="java.util.Map" />

                            </properties>

                            </reader>

                            <processor ref="com.awginc.jberet.examples.JberetAdhocTestCase3ItemProcessor">

                            <properties>

                            <property name="dataSourceLookup" value="jdbc/db2"/>

                            </properties>

                            </processor>

                            <writer ref="jacksonCsvItemWriter">

                            <properties>

                            <property name="resource" value="#{systemProperties['APP_DATA_TMP_FCLTY']}/#{systemProperties['batchJobname']}_#{systemProperties['YYYYMMDD']}_#{systemProperties['HHMMSS']}.out" />

                            <property name="writeMode" value="overwrite" />

                            <property name="columnSeparator" value="|" />

                            <property name="beanType" value="java.util.Map" />

                            <property name="useHeader" value="false" />

                            <property name="csvGeneratorFeatures" value="ALWAYS_QUOTE_STRINGS=false, STRICT_CHECK_FOR_QUOTING=true"/>

                            <property name="columns" value="FCLTY_CD STRING, FCLTY_LTR2_CD STRING, FCLTY_NM STRING, DW_FCLTY_LTR2_CD STRING"/>

                            </properties>

                            </writer>

                            </chunk>

                           

                          2016-10-10 07:59:03.844 [jberet-1    ]   JberetAdhocTestCase3ItemProcessor - [INFO ] {FCLTY_CD=00, DW_FCLTY_LTR2_CD=KC, FCLTY_NM=KANSAS CITY, FCLTY_LTR2_CD=KC}

                          2016-10-10 07:59:03.876 [jberet-1    ]                              jberet - [ERROR] ProcessingInfo{count=1, timerExpired=false, itemState=RUNNING, chunkState=RUNNING, checkpointPosition=-1, readPosition=0, failurePoint=null}

                          2016-10-10 07:59:03.907 [jberet-1    ]                              jberet - [ERROR] item-count=10, time-limit=0, skip-limit=-1, skipCount=0, retry-limit=-1, retryCount=0

                          2016-10-10 07:59:03.939 [jberet-1    ]                              jberet - [ERROR] JBERET000007: Failed to run job Jberet_Adhoc_TestCase3, step2, org.jberet.job.model.Chunk@1ca6c431

                          java.lang.NullPointerException

                            at com.awginc.jberet.examples.JberetAdhocTestCase3ItemProcessor.processItem(JberetAdhocTestCase3ItemProcessor.java:57) ~[bin/:?]

                            at org.jberet.runtime.runner.ChunkRunner.processItem(ChunkRunner.java:409) ~[jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:309) ~[jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:201) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:226) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:128) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:203) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_73]

                            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_73]

                            at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]

                          2016-10-10 07:59:04.017 [jberet-1    ]   JberetAdhocTestCase3ItemProcessor - [INFO ] Closing database resources.

                          2016-10-10 07:59:04.063 [jberet-1    ]                             support - [INFO ] JBERET060502: Closing resource V:/JavaBatchFramework/app/data/tmp/XX/Jberet_Adhoc_TestCase3_20161010_075645.out in class org.jberet.support.io.JacksonCsvItemWriter

                          2016-10-10 07:59:04.079 [jberet-1    ]                    CawaStepListener - [INFO ] Ending Jberet_Adhoc_TestCase3.step2 = FAILED

                          2016-10-10 07:59:04.095 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: READ_SKIP_COUNT=0

                          2016-10-10 07:59:04.110 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: COMMIT_COUNT=0

                          2016-10-10 07:59:04.110 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: PROCESS_SKIP_COUNT=0

                          2016-10-10 07:59:04.126 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: READ_COUNT=1

                          2016-10-10 07:59:04.126 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: WRITE_SKIP_COUNT=0

                          2016-10-10 07:59:04.141 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: FILTER_COUNT=0

                          2016-10-10 07:59:04.157 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: WRITE_COUNT=0

                          2016-10-10 07:59:04.157 [jberet-1    ]                   StepStatsListener - [INFO ] Jberet_Adhoc_TestCase3.step2 metric: ROLLBACK_COUNT=1

                          • 10. Re: ItemProcessor
                            cfang

                            the fix for JBERET-270  will be included in the future release (1.3.0.Beta4).

                             

                            The latest version for JBeret is 1.3.0.Beta3, which as published on 2016-09-11, and so it does not contain the fix you need.

                             

                            If you really need it before 1.3.0.Beta4 is published, you can git clone the JBeret project and build your own jberet-core 1.3.0.Beta4-SNAPSHOT jar.

                            • 11. Re: ItemProcessor
                              richardmoore

                              I changed the item processor ref to be the CDI bean name and it works on my windows machine, I put this on my linux box and it fails with -

                              2016-10-10 13:29:06.509 [jberet-1    ]                              jberet - [ERROR] JBERET000007: Failed to run job Jberet_Adhoc_TestCase3, step2, org.jberet.job.model.Chunk@e5f2a67

                              java.lang.IllegalStateException: JBERET000600: Failed to create artifact with ref name jberetAdhocTestCase3ItemProcessor.  Ensure CDI beans.xml is present and batch.xml, if any, is configured properly.

                                      at org.jberet.runtime.context.JobContextImpl.createArtifact(JobContextImpl.java:194) ~[jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                      at org.jberet.runtime.runner.AbstractRunner.createArtifact(AbstractRunner.java:156) ~[jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                               

                              I have checked that the beans.xml and the batch.xml are on the classpath, what am i missing?

                              • 12. Re: ItemProcessor
                                cfang

                                What is the Caused By exception?  There should be a root exception in the stacktrace.

                                • 13. Re: ItemProcessor
                                  richardmoore

                                  Here is the whole stacktrace

                                   

                                  2016-10-10 13:29:06.506 [jberet-1    ]                              jberet - [ERROR] item-count=10, time-limit=0, skip-limit=-1, skipCount=0, retry-limit=-1, retryCount=0

                                  2016-10-10 13:29:06.509 [jberet-1    ]                              jberet - [ERROR] JBERET000007: Failed to run job Jberet_Adhoc_TestCase3, step2, org.jberet.job.model.Chunk@e5f2a67

                                  java.lang.IllegalStateException: JBERET000600: Failed to create artifact with ref name jberetAdhocTestCase3ItemProcessor.  Ensure CDI beans.xml is present and batch.xml, if any, is configured properly.

                                     at org.jberet.runtime.context.JobContextImpl.createArtifact(JobContextImpl.java:194) ~[jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.AbstractRunner.createArtifact(AbstractRunner.java:156) ~[jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:139) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:226) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:128) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:203) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99) [jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_102]

                                     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_102]

                                     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]

                                  Caused by: java.lang.IllegalStateException: JBERET000600: Failed to create artifact with ref name jberetAdhocTestCase3ItemProcessor.  Ensure CDI beans.xml is present and batch.xml, if any, is configured properly.

                                     at org.jberet.creation.ArtifactFactoryWrapper.getClassFromBatchXmlOrClassLoader(ArtifactFactoryWrapper.java:65) ~[classes/:?]

                                     at org.jberet.creation.ArtifactFactoryWrapper.create(ArtifactFactoryWrapper.java:41) ~[classes/:?]

                                     at org.jberet.runtime.context.JobContextImpl.createArtifact(JobContextImpl.java:192) ~[jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     ... 14 more

                                  Caused by: java.lang.ClassNotFoundException: jberetAdhocTestCase3ItemProcessor

                                     at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_102]

                                     at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_102]

                                     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_102]

                                     at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_102]

                                     at org.jberet.creation.ArtifactFactoryWrapper.getClassFromBatchXmlOrClassLoader(ArtifactFactoryWrapper.java:63) ~[classes/:?]

                                     at org.jberet.creation.ArtifactFactoryWrapper.create(ArtifactFactoryWrapper.java:41) ~[classes/:?]

                                     at org.jberet.runtime.context.JobContextImpl.createArtifact(JobContextImpl.java:192) ~[jberet-core-1.3.0.Beta3.jar:1.3.0.Beta3]

                                     ... 14 more

                                  • 14. Re: ItemProcessor
                                    cfang

                                    the stacktrace shows that CDI didn't find the bean by the name jberetAdhocTestCase3ItemProcessor.

                                     

                                    Not sure why it works on Windows for you, but failed on LInux.  Maybe some files/directories contain mixed lower/uppercase, which caused problem in case-sensitive Linux file system?

                                     

                                    Are you only seeing problem with the item processor, whereas item reader and item writer both loaded correctedly by CDI bean ref name?  If so, it means your CDI setup is correct.

                                     

                                    If still fails, can you provide a reproducible test, and I can try it on Linux.

                                    1 2 Previous Next