7 Replies Latest reply on Jan 25, 2018 2:56 PM by cfang

    Chunk listener and errorHandler for read/write/error counts

    richardmoore

      We have spoken about creating read/write counts with the listener in a chunk. But I need to increment the skip count or have an error count when there are errors during the parse of the BeanIOReader (errorHandler) and during the item processor when I validate and enhance the data. Is there a way to do that? I need to produce a count of records read, written, errors, and skip; I can go with errors and skips as a single count, but I would prefer them separate.

        • 1. Re: Chunk listener and errorHandler for read/write/error counts
          cfang

          Can you post the link to the previous discussion?  Can you  describe your requirement with a short example (i.e., why is it not available from the current step metrics)?

          • 2. Re: Chunk listener and errorHandler for read/write/error counts
            richardmoore

            BeanIOReader

            • Valid record. Pass to ItemProcessor. Increment the read count.
            • Record with invalid numeric field, in the test data I received it has a numeric field that is only 3 digits in length when it is supposed to be 4-5. Increment read count, write to an error file and increment the error count.
            • Valid record. Pass to ItemProcessor. Increment the read count.

            ItemProcessor: validate item number.

            • First record is validated and reformatted successfully and handed to writer.
            • Third record fails validation because it is an invalid item number, hand null to writer; increment error count and write into an error file.

            BeanIOWriter

            • Record one is written, write count incremented.

             

            After all records have been processed write to the log -

            Records read: 3

            Records written: 1

            Records in error: 2

             

            Here is the previous discussion -

            Log chunk reader/writer record counts

            • 3. Re: Chunk listener and errorHandler for read/write/error counts
              cfang

              I think they are all available from step metrics by calling the standard api stepExecution.getMetrics() method.  The following is types of metrics:

               

              Metric.MetricType (Java(TM) EE 7 Specification APIs)

               

              Please see my comments inside bracket below, at the end of each line:

               

              BeanIOReader

              • Valid record. Pass to ItemProcessor. Increment the read count. [this read count is available as READ_COUNT metric]
              • Record with invalid numeric field, in the test data I received it has a numeric field that is only 3 digits in length when it is supposed to be 4-5. Increment read count, write to an error file and increment the error count.  [So you are skipping this record after reading. You can throw an application exception that includes the details of this record, and have a SkipReaderListener, whose onSkipReadItem(Exception ex) method can write the record to error file.  The skip count is available as READ_SKIP_COUNT metric]
              • Valid record. Pass to ItemProcessor. Increment the read count. [same as read 1]

              ItemProcessor: validate item number.

              • First record is validated and reformatted successfully and handed to writer.
              • Third record fails validation because it is an invalid item number, hand null to writer; increment error count and write into an error file. [This error is different than read #2.  Strictly speaking, this is a filtered record: a record that is filtered out by the item processor.  It is tallied as FILTER_COUNT metric.  You can write the record to error file as part of the item processor's process() method, i.e., this record is not treated as a failure and no skip is involved.]

              BeanIOWriter

              • Record one is written, write count incremented. [write count is available as WRITE_COUNT]
              • 4. Re: Chunk listener and errorHandler for read/write/error counts
                richardmoore

                Couldn't remember how to attach my code.

                 

                I have 2 records, the first is a good record, the second has a problem with the date. Without the errorHandler on the reader the job fails, but I do not get a read skip count. What am I missing or don't understand?

                 

                <?xml version="1.0" encoding="UTF-8"?>      

                <job id="AIS_Daily_Tpa" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee">

                 

                <listeners></listeners>

                 

                <properties>

                <property name="readerResource" value="app/data/input.txt" />

                <property name="readerStreamName" value="input" />

                <property name="readerStreamMapping" value="org/richard/test/mapping.xml" />

                 

                <property name="writerResource" value="app/data/output.txt" />

                <property name="writerStreamName" value="output" />

                <property name="writerStreamMapping" value="#{jobProperties['readerStreamMapping']}" />

                </properties>

                 

                <step id="step1">

                 

                <listeners>

                <listener ref="org.richard.test.InputRecordSkipListener" />

                <listener ref="org.richard.test.StepStatsListener" />

                </listeners>

                 

                <chunk item-count="200">

                <reader ref="beanIOItemReader">

                <properties>

                <property name="resource" value="#{jobProperties['readerResource']}" />

                <property name="streamName" value="#{jobProperties['readerStreamName']}" />

                <property name="streamMapping" value="#{jobProperties['readerStreamMapping']}" />

                <property name="errorHandler" value="org.richard.test.InputRecordErrorHandler" />

                </properties>

                </reader>

                 

                <processor ref="org.richard.test.InputRecordProcessor" />

                 

                <writer ref="beanIOItemWriter">

                <properties>

                <property name="resource" value="#{jobProperties['writerResource']}" />

                <property name="streamName" value="#{jobProperties['writerStreamName']}" />

                <property name="streamMapping" value="#{jobProperties['writerStreamMapping']}" />

                </properties>

                </writer>

                </chunk>

                 

                </step>

                 

                </job>

                 

                 

                 

                 

                org.richard.test.InputRecordErrorHandler.handleError: org.beanio.InvalidRecordException = Invalid 'inputRecord' record at line 2

                Invalid 'inputRecord' record at line 2

                allowanceEndDate = '2015-04-2x 00:00:00' - Type conversion error: Invalid date

                AIS_Daily_Tpa.step1 metric: READ_SKIP_COUNT=0

                AIS_Daily_Tpa.step1 metric: WRITE_SKIP_COUNT=0

                AIS_Daily_Tpa.step1 metric: COMMIT_COUNT=1

                AIS_Daily_Tpa.step1 metric: WRITE_COUNT=1

                AIS_Daily_Tpa.step1 metric: PROCESS_SKIP_COUNT=0

                AIS_Daily_Tpa.step1 metric: ROLLBACK_COUNT=0

                AIS_Daily_Tpa.step1 metric: READ_COUNT=1

                AIS_Daily_Tpa.step1 metric: FILTER_COUNT=0

                Completed: COMPLETED

                 

                 

                 

                package org.richard.test;

                 

                import java.util.Iterator;

                 

                import org.beanio.BeanReaderErrorHandler;

                import org.beanio.BeanReaderException;

                 

                public class InputRecordErrorHandler implements BeanReaderErrorHandler {

                 

                @Override

                public void handleError(BeanReaderException arg0) throws Exception {

                System.out.println(this.getClass().getName() + ".handleError: " + arg0.getClass().getName() + " = " + arg0.getMessage());

                 

                for (int i = 0, j = arg0.getRecordCount(); i < j; i++) {

                System.out.println(arg0.getMessage());

                Iterator<String> fields = arg0.getRecordContext(i).getFieldErrors().keySet().iterator();

                while (fields.hasNext()) {

                String field = fields.next();

                for (String error : arg0.getRecordContext(i).getFieldErrors(field)) {

                System.out.println(field + " = '" + arg0.getRecordContext(i).getFieldText(field) + "' - " + error);

                }

                }

                }

                }

                 

                }

                 

                 

                 

                 

                package org.richard.test;

                 

                import java.util.Iterator;

                 

                import javax.batch.api.chunk.listener.SkipReadListener;

                 

                import org.beanio.BeanReaderException;

                 

                public class InputRecordSkipListener implements SkipReadListener {

                 

                @Override

                public void onSkipReadItem(Exception arg0) throws Exception {

                BeanReaderException ex = (BeanReaderException) arg0;

                 

                System.out.println(this.getClass().getName() + ".onSkipReadItem: " + ex.getClass().getName() + " = " + ex.getMessage());

                 

                for (int i = 0, j = ex.getRecordCount(); i < j; i++) {

                System.out.println(ex.getMessage());

                Iterator<String> fields = ex.getRecordContext(i).getFieldErrors().keySet().iterator();

                while (fields.hasNext()) {

                String field = fields.next();

                for (String error : ex.getRecordContext(i).getFieldErrors(field)) {

                System.out.println(field + " = '" + ex.getRecordContext(i).getFieldText(field) + "' - " + error);

                }

                }

                }

                }

                 

                }

                 

                 

                 

                 

                 

                package org.richard.test;

                 

                import javax.batch.api.listener.AbstractStepListener;

                import javax.batch.api.listener.StepListener;

                import javax.batch.runtime.Metric;

                import javax.batch.runtime.context.JobContext;

                import javax.batch.runtime.context.StepContext;

                import javax.inject.Inject;

                 

                public class StepStatsListener extends AbstractStepListener implements StepListener {

                 

                @Inject

                private JobContext jobContext;

                 

                @Inject

                private StepContext stepContext;

                 

                @Override

                public void beforeStep() throws Exception {}

                 

                @Override

                public void afterStep() throws Exception {

                long count = 0;

                 

                for (Metric metric : stepContext.getMetrics()) {

                count += metric.getValue();

                }

                 

                if (count > 0) {

                for (Metric metric : stepContext.getMetrics()) {

                System.out.println(jobContext.getJobName() + "." + stepContext.getStepName() + " metric: " + metric.getType() + "=" + metric.getValue());

                }

                }

                }

                 

                }

                • 5. Re: Chunk listener and errorHandler for read/write/error counts
                  cfang

                  for the skip/retry to take effect, you will need to declare them in your job.xml, via

                   

                  <chunk item-count="200">

                              <skippable-exception-classes>

                                  <include class="org.beanio.BeanReaderException"/>

                                  <exclude class="..."/>

                              </skippable-exception-classes>

                              <retryable-exception-classes>

                                  <include class="..."/>

                                  <exclude class="..."/>

                              </retryable-exception-classes>

                              <reader ref="beanIOItemReader">

                  In your example, if you don't have beanio reader errorHandler, and you don't declare skippable exceptions in job.xml, any invalid record will cause the bean io reader to throw BeanReaderException, which will caues the step and job to fail.

                   

                  If you don't have beanio error handler, but you declare skippable exception matching BeanReaderException, any such matching exception will JBeret to activate skip mechanism, and skip count be recorded, and read skip listener be called to log to error file, etc, and after that the reader will continue to read the next record.

                   

                  If you have bean io reader error handler whose handleError(...) method does not rethrow any exception, JBeret doesn't know there is an exception in the reader, and skip will not be triggered.

                   

                  In job.xml you can only delcare skip/retry based on exception types and their subtypes, via including & excluding certain types.  You need to make sure if these exception types really match what you intend to skip/retry.  See BeanReaderException types and subtypes:

                   

                  • 6. Re: Chunk listener and errorHandler for read/write/error counts
                    richardmoore

                    I set it up as follows and it is working as I hoped. Is there a way to supply a reader property for errorResource (file) and the errorHandler be able to pull that in to write the error records in? Will the file have to be opened and closed each time for a write or is there a way to do a preDestroy type approach?

                     

                    <skippable-exception-classes>

                    <include class="org.beanio.BeanReaderException" />

                    </skippable-exception-classes>

                     

                    <reader ref="beanIOItemReader">

                    <properties>

                    <property name="resource" value="#{jobProperties['readerResource']}" />

                    <property name="streamName" value="#{jobProperties['readerStreamName']}" />

                    <property name="streamMapping" value="#{jobProperties['readerStreamMapping']}" />

                    <property name="errorHandler" value="org.richard.test.InputRecordErrorHandler" />

                    </properties>

                    </reader>

                    • 7. Re: Chunk listener and errorHandler for read/write/error counts
                      cfang

                      since your error handler is a beanio-defined class, not a batch artifact, there is clean way to pass batch context information to it.  An errorResource file for logging invalid records belongs to the error handler and doesn't quite fit in BeanIOItemReader.

                       

                      If you use read skip listener instead to perform what you need to do in the errorHandler, you can inject JobContext and/or StepContext into the read skip listener to read configuration specified as job properties or step properties.

                       

                      As for closing the error file, I would open it and close it every time you use it, since you don't know when is the next time to log to it.  There may be no more skips and you'll never need to access it in a step.