Chapter 8. Item Processors

In the previous chapter, you learned how to read various types of input using the components of Spring Batch. Obviously, obtaining the input for any piece of software is an important aspect of the project; however, it doesn't mean much if you don't do something with it. Item processors are the component within Spring Batch where you do something with your input. In this chapter, you will look at the ItemProcessor interface and see how you can use it to develop your own processing of batch items.

  • In the "Introduction to ItemProcessors" section, you'll start with a quick overview of what an ItemProcessor is and how it fits into the flow of a step.

  • Spring Batch provides utility ItemProcessors like the ItemProcessorAdapter, which uses existing services as your ItemProcessor implementation. In the "Using Spring Batch's ItemProcessors" section, you'll take and in-depth look at each of the processors the framework provides.

  • In many cases, you will want to develop your own ItemProcessor implementation. In the "Writing Your Own ItemProcessors" section, you will look at different considerations as you implement an example ItemProcessor.

  • A common use of an ItemProcessor is to filter out items that were read in by an ItemReader from being written by the step's ItemWriter. In the "Filtering Items" section, you'll look at an example of how this is accomplished.

Introduction to ItemProcessors

In Chapter 7, you looked at ItemReaders, the input facility you use within Spring Batch. Once you have received your input, you have two options. The first is to just write it back out as you did in the examples in the last chapter. There are many times when that will make sense. Migrating data from one system to another or loading data into a database initially are two examples of where reading input and writing it directly without any additional processing makes sense.

However, in most scenarios, you are going to need to do something with the data you read in. Spring Batch has broken up the pieces of a step to allow a good separation of concerns between reading, processing, and writing. Doing this allows you the opportunity to do a couple unique things, such as the following:

  • Validate input: In the original version of Spring Batch, validation occurred at the ItemReader by subclassing the ValidatingItemReader class. The issue with this approach is that none of the provided readers subclassed the ValidatingItemReader class so if you wanted validation, you couldn't use any of the included readers. Moving the validation step to the ItemProcessor allows validation to occur on an object before processing, regardless of the input method. This makes much more sense from a division-of-concerns perspective.

  • Reuse existing services: Just like the ItemReaderAdapter you looked at in Chapter 7 to reuse services for your input, Spring Batch provides an ItemProcessorAdapter for the same reason.

  • Chain ItemProcessors: There are situations where you will want to perform multiple actions on a single item within the same transaction. Although you could write your own custom ItemProcessor to do all of the logic in a single class, that couples your logic to the framework, which is something you want to avoid. Instead, Spring Batch allows you to create a list of ItemProcessors that will be executed in order against each item.

To accomplish this, the org.springframework.batch.item.ItemProcessor interface consists of a single method process shown in Listing 8-1. It takes an item as read from your ItemReader and returns another item.

Example 8.1. ItemProcessor Interface

package org.springframework.batch.item;

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

It's important to note that the type the ItemProcessor receives as input does not need to be the same type it returns. The framework allows for you to read in one type of object and pass it to an ItemProcessor and have the ItemProcessor return a different type for writing. With this feature, you should note that the type the final ItemProcessor returns is required to be the type the ItemWriter takes as input. You should also be aware that if an ItemProcessor returns null, all processing of the item will stop. In other words, any further ItemProcessors for that item will not be called nor shall the ItemWriter for the step. However, unlike returning null from an ItemReader, which indicates to Spring Batch that all input has been exhausted, processing of other items will continue when an ItemProcessor returns null.

Note

The type an ItemProcessor returns doesn't need to be the same as it takes in as input.

Let's take a look at how to use ItemProcessors for your jobs. To start, you'll dig into the ones provided by the framework.

Using Spring Batch's ItemProcessors

When you looked at the ItemReaders previously, there was a lot of ground to cover regarding what was provided from Spring Batch because input and output are two relatively standard things. Reading from a file is the same in most cases. Writing to a database works the same with most databases. However, what you do to each item is different based upon your business requirements. This is really what makes each job different. Because of this, the framework can only provide you with the facility to either implement your own or wrap existing logic. This section will cover the ItemProcessors that are included in the Spring Batch framework.

ValidatingItemProcessor

You'll start your look at Spring Batch's ItemProcessor implementations with where you left off in Chapter 7. Previously, you handled obtaining input for your jobs; however, just because you can read it doesn't mean it's valid. Data validation with regards to types and format can occur within an ItemReader; however, validation via business rules is best left once the item has been constructed. That's why Spring Batch provides an ItemProcessor implementation for validating input called the ValidatingItemProcessor. In this section, you will look at how to use it to validate your input.

Input Validation

The org.springframework.batch.item.validator.ValidatingItemProcessor is an implementation of the ItemProcessor interface that allows you to set an implementation of Spring Batch's Validator interface[21] to be used to validate the incoming item prior to processing. If the item passes validation, it will be processed. If not, an org.springframework.batch.item.validator.ValidationException is thrown, causing normal Spring Batch error handling to kick in.

JSR 303 is the Java specification for bean validation. Because it only came out in late 2009, it hasn't been as widely integrated as I would like; however, I consider it a better alternative to the Spring Modules[22] examples show in most Spring Batch documentation. The validation performed via the javax.validation.* code is configured via annotations. There is a collection of annotations that predefine validation functions out of the box; you also have the ability to create your own validation functions. Let's start by looking at how you would validate a Customer class like the one in Listing 8-2.

Example 8.2. Customer Class

package com.apress.springbatch.chapter8;

public class Customer {
    private String firstName;
    private String middleInitial;
    private String lastName;
    private String address;
    private String city;
private String state;
    private String zip;

    // Getters & setters go here
...
}

If you look at the Customer class in Listing 8-2, you can quickly determine some basic validation rules.

  • Not null: firstName, lastName, address, city, state, zip.

  • Alphabetic: firstName, middleInitial, lastName, city, state.

  • Numeric: zip.

  • Size: middleInitial should be no longer than one character; state should be no longer than two characters; and zip should be no longer than five characters.

There are further validations you can perform on the data provided zip is a valid ZIP code for the city and state. However, this provides you with a good start. Now that you have identified the things you want to validate, you can describe them to your validator via annotations on the Customer object. Specifically, you will be using the @NotNull, @Size, and @Pattern annotations for these rules. To use these, you will need to update your pom to reference a new library. You will use the Hibernate implementation of the JSR 303 annotations, so you will need to add it to your project. Listing 8-3 shows the dependency you need to add.

Example 8.3. Hibernate Implementation of JSR 303 Dependency

<dependency>
    <groupId>org.hibernate</groupId>
    <artifactId>hibernate-validator</artifactId>
    <version>4.0.2.GA</version>
</dependency>

With the dependency in place, you can actually update your code to use the annotations. Listing 8-4 shows their use on the Customer object.

Example 8.4. Customer Object with Validation Annotations

package com.apress.springbatch.chapter8;

import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import javax.validation.constraints.Size;

public class Customer {

    @NotNull
    @Pattern(regexp="[a-zA-Z]+")
    private String firstName;

    @Size(min=1, max=1)
private String middleInitial;

    @NotNull
    @Pattern(regexp="[a-zA-Z]+")
    private String lastName;

    @NotNull
    @Pattern(regexp="[0-9a-zA-Z\. ]+")
    private String address;

    @NotNull
    @Pattern(regexp="[a-zA-Z\. ]+")
    private String city;

    @NotNull
    @Size(min=2,max=2)
    @Pattern(regexp="[A-Z]{2}")
    private String state;

    @NotNull
    @Size(min=5,max=5)
    @Pattern(regexp="\d{5}")
    private String zip;

    // Accessors go here
...
}

A quick look at the rules defined in Listing 8-4 may make you ask why use both the @Size annotation and the @Pattern one when the regular expression defined in the @Pattern would satisfy both. You are correct. However, each annotation allows you to specify a unique error message (if you want); moreover, being able to identify if the field was the wrong size vs. the wrong format may be helpful in the future.

At this point, you have defined the validation rules you will use for your Customer item. However, there is no Validator implementation within Spring yet that handles the execution of these rules. Because of this, you will have to create your own. Fortunately, it only requires a couple lines of code to create a universal validator for the basic JSR 303 validations. To do this, you will implement Spring Batch's org.springframework.batch.item.validator.Validator interface and use Hibernate's implementation of the javax.validation.Validator to validate your item. Listing 8-5 shows the code for the validator.

Example 8.5. BeanValidator

package com.apress.springbatch.chapter8;

import java.util.Set;

import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;

@SuppressWarnings("rawtypes")
public class BeanValidator implements Validator, InitializingBean {

    private javax.validation.Validator validator;

    public void afterPropertiesSet() throws Exception {
        ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
        validator = validatorFactory.usingContext().getValidator();
    }

    public void validate(Object target) throws ValidationException {

        Set<ConstraintViolation<Object>> constraintViolations = validator.validate(target);

        if(constraintViolations.size() > 0) {
            buildValidationException(constraintViolations);
        }
    }

    private void buildValidationException(
            Set<ConstraintViolation<Object>> constraintViolations) {
        StringBuilder message = new StringBuilder();

        for (ConstraintViolation<Object> constraintViolation : constraintViolations) {
            message.append(constraintViolation.getMessage() + "
");
        }

        throw new ValidationException(message.toString());
    }
}

Implementing Spring Batch's Validator interface as well as Spring's org.springframework.beans.factory.InitializingBean interface allows you to obtain an instance of the Java validator in the afterPropertiesSet method and execute the validation within the validate method. Once you have validated the object, you can construct a ValidationException out of the messages you received if any attributes failed validation.

Note

The Validator interface included in the Spring Batch framework is not the same as the Validator interface that is part of the core Spring framework. Spring Batch provides an adapter class, SpringValidator, to handle the differences.

Let's see how all of this works together by creating a job to put them to use. Your job will read a comma-delimited file into your Customer object, which will then be valided as part of the ValidatingItemProcessor and written out to a csv, as you did in Chapter 7. To start, Listing 8-6 shows an example of the input you will process.

Example 8.6. customer.csv

Richard,N,Darrow,5570 Isabella Ave,St. Louis,IL,58540
Warren,L,Darrow,4686 Mt. Lee Drive,St. Louis,NY,94935
Barack,G,Donnelly,7844 S. Greenwood Ave,Houston,CA,38635
Ann,Z,Benes,2447 S. Greenwood Ave,Las Vegas,NY,55366
Laura,9S,Minella,8177 4th Street,Dallas,FL,04119
Erica,Z,Gates,3141 Farnam Street,Omaha,CA,57640
Warren,M,Williams,6670 S. Greenwood Ave,Hollywood,FL,37288
Harry,T,Darrow,3273 Isabella Ave,Houston,FL,97261
Steve,O,Darrow,8407 Infinite Loop Drive,Las Vegas,WA,90520
Erica,Z,Minella,513 S. Greenwood Ave,Miami,IL,12778

Note on line 5 of your input the middle initial field is 9S, which is invalid. This should cause your validation to fail at this point. With your input file defined, you can configure the job. The job you will be running will consist of a single step that reads in the input, passes it to an instance of the ValidatingItemProcessor, and then writes it to an output file. Listing 8-7 shows the configuration for the job.

Example 8.7. copyJob.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xmlns:util="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/batch
        http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">

    <beans:import resource="../launch-context.xml"/>

    <beans:bean id="customerFile" class="org.springframework.core.io.FileSystemResource" scope="step">
        <beans:constructor-arg value="#{jobParameters[customerFile]}"/>
    </beans:bean>

    <beans:bean id="customerFileReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <beans:property name="resource" ref="customerFile" />
        <beans:property name="lineMapper">
            <beans:bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <beans:property name="lineTokenizer">
                    <beans:bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <beans:property name="names"
value="firstName,middleInitial,lastName,address,city,state,zip"/>
                        <beans:property name="delimiter" value=","/>
                    </beans:bean>
                </beans:property>
                <beans:property name="fieldSetMapper">
                    <beans:bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <beans:property name="prototypeBeanName" value="customer"/>
                    </beans:bean>
                </beans:property>
            </beans:bean>
        </beans:property>
    </beans:bean>

    <beans:bean id="customer" class="com.apress.springbatch.chapter8.Customer" scope="prototype"/>

    <beans:bean id="outputFile" class="org.springframework.core.io.FileSystemResource" scope="step">
        <beans:constructor-arg value="#{jobParameters[outputFile]}"/>
    </beans:bean>

    <beans:bean id="outputWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <beans:property name="resource" ref="outputFile" />
        <beans:property name="lineAggregator">
            <beans:bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
        </beans:property>
    </beans:bean>

    <beans:bean id="customerValidatingProcessor"
        class="org.springframework.batch.item.validator.ValidatingItemProcessor">
        <beans:property name="validator">
            <beans:bean class="com.apress.springbatch.chapter8.BeanValidator"/>
        </beans:property>
    </beans:bean>

    <step id="copyFileStep">
        <tasklet>
            <chunk reader="customerFileReader" processor="customerValidatingProcessor"
                writer="outputWriter" commit-interval="10"/>
        </tasklet>
    </step>

    <job id="copyJob">
        <step id="step1" parent="copyFileStep"/>
    </job>
</beans:beans>

To walk through the copyJob.xml file listed in Listing 8-7, let's start with definitions of the input file and the reader. This reader is a simple delimited file reader that maps the fields of the file to your Customer object. Next is the output configuration, which consists of defining the file and its writer. With the input and output defined, the bean customerValidatingProcessor will serve as your ItemProcessor. By default, the ValidatingItemProcessor just passes the item through from the ItemReader to the ItemWriter, which will work for this example. The only dependency you inject for your ItemProcessor is the reference to the BeanValidator you write in Listing 8-5. With all of the beans defined, you can build your step, which is the next piece of the file. All you need for your step is to define the reader, processor, and writer. With your step defined, you finish the file by configuring the job itself.

To run the job, use the command in Listing 8-8 from the target directory of your project.

Example 8.8. Running the copyJob

java -jar itemProcessors-0.0.1-SNAPSHOT.jar jobs/copyJob.xml copyJob customerFile=/tmp/customer.csv outputFile=/tmp/output.csv

As mentioned, you have some bad input that will not pass validation. When you run the job, it fails due to the ValidationException that is thrown. To get the job to complete successfully, you have to fix your input to pass validation. Listing 8-9 shows the results of your job when the input fails validation.

Example 8.9. copyJob Output

2011-02-13 16:31:11,030 DEBUG main [org.springframework.batch.core.step.tasklet.TaskletStep] - <Applying contribution: [StepContribution: read=10, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]>
2011-02-13 16:31:11,031 DEBUG main [org.springframework.batch.core.step.tasklet.TaskletStep] - <Rollback for RuntimeException: org.springframework.batch.item.validator.ValidationException: size must be between 1 and 1
>
2011-02-13 16:31:11,031 DEBUG main [org.springframework.batch.repeat.support.RepeatTemplate] - <Handling exception: org.springframework.batch.item.validator.ValidationException, caused by: org.springframework.batch.item.validator.ValidationException: size must be between 1 and 1
>
2011-02-13 16:31:11,031 DEBUG main [org.springframework.batch.repeat.support.RepeatTemplate] - <Handling fatal exception explicitly (rethrowing first of 1): org.springframework.batch.item.validator.ValidationException: size must be between 1 and 1
>
2011-02-13 16:31:11,032 ERROR main [org.springframework.batch.core.step.AbstractStep] - <Encountered an error executing the step>
org.springframework.batch.item.validator.ValidationException: size must be between 1 and 1

        at
com.apress.springbatch.chapter8.BeanValidator.buildValidationException(BeanValidator.java:40
)
        at com.apress.springbatch.chapter8.BeanValidator.validate(BeanValidator.java:28)
        at
org.springframework.batch.item.validator.ValidatingItemProcessor.process(ValidatingItemProcessor.java:77)

That is all that is required to add item validation to your jobs in Spring Batch. JSR 303 provides the ability to add custom checks as well as a number of additional annotations out of the box to be able to create even more robust validation. To read more about JSR 303 and validation using it within a Spring application, visit the Spring documentation on validation at http://static.springsource.org/spring/docs/current/spring-framework-reference/html/validation.html.

Before you move on, however, the previous example only applied the validation itself to the input. It did not apply any processing to the item once it did pass validation. This next section will look at how to apply business logic once an item has passed validation.

Subclassing the ValidatingItemProcessor

Although in the previous section you were able to perform item validation, you didn't actually process the item once it did pass validation. In this section, you will look at how to subclass the ValidatingItemProcessor to apply logic to each item as it passes validation.

By subclassing the ValidatingItemProcessor class, you can override the process method to apply your logic to each item. If you use the same example as you did for the validation, you can add the ability to output the customer's name and the number record he was in your implementation. Listing 8-10 shows the code for the CustomerValidatingItemProcessor.

Example 8.10. CustomerValidatingItemProcessor

package com.apress.springbatch.chapter8;

import org.springframework.batch.item.validator.ValidatingItemProcessor;

public class CustomerValidatingItemProcessor extends ValidatingItemProcessor<Customer> {

    private int recordCount = 0;

    @Override
    public Customer process(Customer customer) {
        recordCount++;

        System.out.println(customer.getFirstName() + " " +
                                       customer.getLastName() + " was record number " +
                                       recordCount + " in your file.");

        return customer;
    }
}

With the validation logic already addressed with your BeanValidator class and the annotations on the Customer class, the CustomerValidatingItemProcessor only needs to concern itself with the actual logic required for this step. In this case, you keep a running count of the number of items you receive and print them out to standard out with each item. To use your implementation of the ValidatingItemProcessor, the only configuration change you need to do is update the class identified in the customerValidatingProcessor bean. Listing 8-11 shows the updated configuration.

Example 8.11. Updated Configuration for the customerValidatingProcessor Bean

...
<beans:bean id="customerValidatingProcessor"
    class="com.apress.springbatch.chapter8.CustomerValidatingItemProcessor">
    <beans:property name="validator">
        <beans:bean class="com.apress.springbatch.chapter8.BeanValidator"/>
    </beans:property>
</beans:bean>
...

When you run the job with your new configuration and your input is updated to pass validation by removing the 9 from the middleInitial field of the fifth record, Listing 8-12 shows the results you get.

Example 8.12. Results of New Logic Applied to the Customer Item

2011-02-13 17:35:00,234 DEBUG main [org.springframework.batch.repeat.support.RepeatTemplate] - <Repeat operation about to start at count=9>
2011-02-13 17:35:00,234 DEBUG main [org.springframework.batch.repeat.support.RepeatTemplate] - <Repeat operation about to start at count=10>
2011-02-13 17:35:00,234 DEBUG main [org.springframework.batch.repeat.support.RepeatTemplate] - <Repeat is complete according to policy and result value.>
Richard Darrow was record number 1 in your file.
Warren Darrow was record number 2 in your file.
Barack Donnelly was record number 3 in your file.
Ann Benes was record number 4 in your file.
Laura Minella was record number 5 in your file.
Erica Gates was record number 6 in your file.
Warren Williams was record number 7 in your file.
Harry Darrow was record number 8 in your file.
Steve Darrow was record number 9 in your file.
Erica Minella was record number 10 in your file.
2011-02-13 17:35:00,235 DEBUG main [org.springframework.batch.item.file.FlatFileItemWriter] - <Writing to flat file with 10 items.>
2011-02-13 17:35:00,236 DEBUG main [org.springframework.batch.core.step.item.ChunkOrientedTasklet] - <Inputs not busy, ended: false>

The ValidatingItemProcessor is useful for being able to apply validation to your items as they are processed. However, it is only one of the three implementations of the ItemProcessor interface provided by Spring Batch. In the next section you will look at the ItemProcessorAdapter and how it allows you to use existing services as ItemProcessors.

ItemProcessorAdapter

In Chapter 7, you looked at the ItemReaderAdapter as a way to use existing services to provide input to your jobs. Spring Batch also allows you to put to use the various services you already have developed as ItemProcessors as well by using the org.springframework.batch.item.adapter.ItemProcessorAdapter. In this section, you will look at the ItemProcessorAdapter and see how it lets you use existing services as processors for your batch job items.

Let's use an example where you read in customers, use the ItemProcessor to lookup their account executive, and pass the AccountExecutive object to the ItemWriter. Before you get into the code itself, let's take a look at the updated data model showing the relationship between the AccountExecutive and the Customer. Figure 8-1 shows the updated data model.

Data model for the Customer-AccountExecutive relationship

Figure 8.1. Data model for the Customer-AccountExecutive relationship

These tables will require a domain object each. While the Customer object you previously used will work fine with the additions of the ID field and the accountExecutive reference, you will need to create a new AccountExecutive domain object. Listing 8-13 show the code for both domain objects.

Example 8.13. Customer and AccountExecutive Domain Objects

Customer
package com.apress.springbatch.chapter8;

import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import javax.validation.constraints.Size;

public class Customer {

    private long id;

    @NotNull
    @Pattern(regexp="[a-zA-Z]+")
    private String firstName;

    @Size(min=1, max=1)
    private String middleInitial;

    @NotNull
    @Pattern(regexp="[a-zA-Z]+")
    private String lastName;
@NotNull
    @Pattern(regexp="[0-9a-zA-Z\. ]+")
    private String address;

    @NotNull
    @Pattern(regexp="[a-zA-Z\. ]+")
    private String city;

    @NotNull
    @Size(min=2,max=2)
    @Pattern(regexp="[A-Z]{2}")
    private String state;

    @NotNull
    @Size(min=5,max=5)
    @Pattern(regexp="\d{5}")
    private String zip;

    private AccountExecutive accountExecutive;

    // Accessors go here
    ...

    @Override
    public String toString() {
        StringBuilder output = new StringBuilder();
        output.append(firstName);
        output.append(" ");
        output.append(middleInitial);
        output.append(" ");
        output.append(lastName);
        output.append(" lives at ");
        output.append(address);
        output.append(" ");
        output.append(city);
        output.append(", ");
        output.append(state);
        output.append(" ");
        output.append(zip);
        output.append(" and has ");

        if(accountExecutive != null) {
            output.append(accountExecutive.getFirstName());
            output.append(" ");
            output.append(accountExecutive.getLastName());
            output.append(" as their account exec");
        } else {
            output.append("no account exec");
        }

        return output.toString();
    }
}

AccountExecutive
package com.apress.springbatch.chapter8;

public class AccountExecutive {

    private long id;
    private String firstName;
    private String lastName;

    // Accessors go here
    ...
}

To support the need to be able to read from this table, you will implement a new DAO that extends Spring's JdbcTemplate. This DAO will have a single method used to get an AccountExecutive object from the database based upon the Customer provided. Listing 8-14 shows the implementation of the AccountExecutiveDaoImpl.

Example 8.14. AccountExecutiveDaoImpl

package com.apress.springbatch.chapter8;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;

public class AccountExecutiveDaoImpl extends JdbcTemplate implements
        AccountExecutiveDao {

    private String BY_CUSTOMER = "select a.* from accountExecutive a inner join " +
        "customer c on a.id = c.accountExecutiveId where c.id = ?";

    public AccountExecutive getAccountExecutiveByCustomer(Customer customer) {
        return (AccountExecutive) queryForObject(BY_CUSTOMER,
                                                   new Object [] {customer.getId()},
                                                   new RowMapper()
               {
                 public Object mapRow(ResultSet rs, int arg1) throws SQLException {
                   AccountExecutive result = new AccountExecutive();

                 result.setFirstName(rs.getString("firstName"));
                 result.setLastName(rs.getString("lastName"));
                 result.setId(rs.getLong("id"));

                 return result;
               }
            });
    }
}

As Listing 8-14 shows, the AccountExecutiveDaoImpl consists of a single method to look up the AccountExecutive by the Customer's information. The ResultSet you receive back is mapped by the RowMapper implementation you coded inline.

To put this Dao to use, you could do two things. The first would be to implement the ItemProcessor interface and perform the logic of looking up the AccountExecutive there. However, this doesn't provide any portability when it comes to reusing this code outside of your batch jobs. Instead, you'll implement a service that you can wrap with the ItemProcessorAdapter as well as use in non-Spring Batch applications. Listing 8-15 shows the code for the service you will use.

Example 8.15. CustomerServiceImpl

package com.apress.springbatch.chapter8;

public class CustomerServiceImpl {

    private AccountExecutiveDao acctExecDao;

    public AccountExecutive getAccountExecutiveForCustomer(Customer customer) {
        return acctExecDao.getAccountExecutiveByCustomer(customer);
    }

    public void setAcctExecDao(AccountExecutiveDao execDao) {
        acctExecDao = execDao;
    }
}

In order to use this service, you can configure the ItemProcessorAdapter to call the getAccountExecutiveForCustomer method. By default, Spring Batch will use the item the ItemProcessor receives when its process method is called as the parameter to the method on the service you call. In this case, the ItemProcessor you configure will receive a Customer object as the parameter to the process method. Spring Batch will take that Customer object and call your service with that object as the parameter. To make this happen, you configure your ItemProcessor to use the ItemProcessorAdapter as the class and satisfy two required dependencies:

  • targetObject: The object that contains the method to be called.

  • targetMethod: The name of the method to be called (as a String).

The configuration for this is shown in Listing 8-16.

Example 8.16. ItemProcessor Configuration

...
<beans:bean id="accountExecutiveDao" class="com.apress.springbatch.chapter8.AccountExecutiveDaoImpl">
    <beans:property name="dataSource" ref="dataSource"/>
</beans:bean>

<beans:bean id="customerService" class="com.apress.springbatch.chapter8.CustomerServiceImpl">
    <beans:property name="acctExecDao" ref="accountExecutiveDao"/>
</beans:bean>

<beans:bean id="customerProcessor"
    class="org.springframework.batch.item.adapter.ItemProcessorAdapter">
    <beans:property name="targetObject" ref="customerService"/>
    <beans:property name="targetMethod" value="getAccountExecutiveForCustomer"/>
</beans:bean>

<step id="copyFileStep">
    <tasklet>
        <chunk reader="customerFileReader" processor="accountExecutiveItemProcessor" writer="outputWriter"
            commit-interval="10"/>
    </tasklet>
</step>

<job id="copyJob">
    <step id="step1" parent="copyFileStep"/>
</job>
...

With the job configured to use your new ItemProcessor, the ItemProcessorAdapter, and your job will call the CustomerServiceImpl with each Customer item you input and pass the returned AccountExecutive object to the ItemWriter. As mentioned, the framework allows for an ItemProcessor to accept one type as input and another as output.

The idea of applying a single action to an item within a transaction can be limiting in certain situations. For example, if you have a set of calculations that need to be done on some of the items, you may want to filter out the ones that don't need to be processed. In the next section, you will look at how to configure Spring Batch to execute a list of ItemProcessors on each item within a step.

CompositeItemProcessor

You break up a step into three phases (reading, processing, and writing) to divide responsibilities between components. However, the business logic that needs to be applied to a given item may not make sense to couple into a single ItemProcessor. Spring Batch allows you to maintain that same division of responsibilities within your business logic by chaining ItemProcessors within a step. In this section, you will look at how to chain ItemProcessors within a single step using Spring Batch's CompositeItemProcessor.

The org.springframework.batch.item.support.CompositeItemProcessor is an implementation of the ItemProcessor interface that delegates processing to each of a list of ItemProcessor implementations in order. As each processor returns its result, that result is passed onto the next processor until they all have been called. This pattern occurs regardless of the types returned so if the first ItemProcessor takes a String as input it can return a Product object as output as long as the next ItemProcessor takes a Product as input. At the end, the result is passed to the ItemWriter configured for the step. It is important to note that just like any other ItemProcessor, if any of the processors this one delegates to returns null, the item will not be process further. Figure 8-2 shows how the processing within the CompositeItemProcessor occurs.

CompositeItemProcessor processing

Figure 8.2. CompositeItemProcessor processing

As Figure 8-2 shows, the CompositeItemProcessor serves as a wrapper for multiple ItemProcessors, calling them in order. As one completes, the next one is called with the item returned from the previous one. Let's take a look at how this looks in practice.

In this example, you are going to take a Customer item that was read in from an input file, look it up in the database to get its database ID in the first ItemProcessor, and then pass it onto a second ItemProcessor to lookup its AccountExecutive. You will update the Customer object with its AccountExecutive reference and pass that to the writer to be written to a file.

The data model for this example will be the same as the one you used in the ItemProcessorAdapter example, consisting of two tables: a customer table containing all of the basic customer information (name and address) as well as a reference to an account executive. For this example, the account executive will consist only of its name.

As mentioned, this step will first look up the customer and set the ID on the customer, then look up the customer's account executive and update the item with that as well. In both cases, the ItemProcessor really does nothing more than do a database lookup and update the item appropriately. Let's look at the first ItemProcessor, the CustomerItemProcessor. Listing 8-17 shows the code involved.

Example 8.17. CustomerItemProcessor

package com.apress.springbatch.chapter8;

import org.springframework.batch.item.ItemProcessor;

public class CustomerItemProcessor implements ItemProcessor<Customer, Customer> {

    private CustomerDao customerDao;

    public Customer process(Customer customer) {
        Customer currentCustomer =
          customerDao.getCustomerByNameAndZip(customer.getFirstName(),
                                                 customer.getLastName(),
                                                 customer.getZip());

        customer.setId(currentCustomer.getId());

        return customer;
    }

    public void setCustomerDao(CustomerDao customerDao) {
this.customerDao = customerDao;
    }
}

As you can see in Listing 8-17, the CustomItemProcessor implements the ItemProcessor interface, both accepting and returning a Customer object as the item. When the processor receives the item, it looks up the customer by name and ZIP code and updates the item you received with the ID in the database. The Customer is then returned to be processed by the next ItemProcessor, which in this case is the AccountExecutiveItemProcessor shown in Listing 8-18.

Example 8.18. AccountExecutiveItemProcessor

package com.apress.springbatch.chapter8;

import org.springframework.batch.item.ItemProcessor;

public class AccountExecutiveItemProcessor implements ItemProcessor<Customer, Customer> {

    private AccountExecutiveDao accountExecutiveDao;

    public Customer process(Customer customer) {
      customer.setAccountExecutive(
         accountExecutiveDao.getAccountExecutiveByCustomer(customer));

      return customer;
    }

    public void setAccountExecutiveDao(AccountExecutiveDao accountExecutiveDao) {
        this.accountExecutiveDao = accountExecutiveDao;
    }
}

Same process, different domain. In the AccountExecutiveItemProcessor, you again take a Customer object as input. However, this time you look up which AccountExecutive it's associated with and update the item with the correct association. You then return the same Customer object to be written to your output file.

The last piece of this puzzle from a code perspective is the two DAOs you used in the ItemProcessors: the CustomerDao and the AccountExecutiveDao. In each case, you extend Spring's JdbcTemplate to make accessing the database easier. All you need to do is define your query, inject the parameters, and build a RowMapper implementation. Listing 8-19 has the CustomerDao's implementation.

Example 8.19. CustomerDao

package com.apress.springbatch.chapter8;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
public class CustomerDaoImpl extends JdbcTemplate implements CustomerDao {

    private static final String BY_ATTRIBUTES =
       "select * from customer where firstName = ? " +
       "and lastName = ? and zip = ?";

    @SuppressWarnings("unchecked")
    public Customer getCustomerByNameAndZip(String firstName, String lastName, String zip) {
        List<Customer> customers = query(BY_ATTRIBUTES,
                                           new Object []{
                                             firstName,
                                             lastName,
                                             zip},
                                           new RowMapper() {

            public Object mapRow(ResultSet rs, int arg1) throws SQLException {
              Customer result = new Customer();

            result.setFirstName(rs.getString("firstName"));
            result.setLastName(rs.getString("lastName"));
            result.setAddress(rs.getString("address"));
            result.setCity(rs.getString("city"));
            result.setState(rs.getString("state"));
            result.setZip(rs.getString("zip"));
            result.setId(rs.getLong("id"));

            return result;
          }});

        if(customers != null && customers.size() > 0) {
            return customers.get(0);
        } else {
            return null;
        }
    }
}

The CustomerDao queries the Customer table via first name, last name, and ZIP code to find the customer you received. From there, it uses Spring's RowMapper to create a new Customer object containing the results of the query. Although you probably don't need the full object returned in this scenario, since the Customer object is not a very large object, passing the entire object back allows this method to be a little more reusable.

The AccountExecutiveDao is the other DAO you are using and is listed in Listing 8-20.

Example 8.20. AccountExecutiveDao

package com.apress.springbatch.chapter8;

import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;

public class AccountExecutiveDaoImpl extends JdbcTemplate implements
        AccountExecutiveDao {

    private String BY_CUSTOMER = "select a.* from accountExecutive a inner join customer c " +
                                 "on a.id = c.accountExecutiveId where c.id = ?";

    public AccountExecutive getAccountExecutiveByCustomer(Customer customer) {
        return (AccountExecutive) queryForObject(BY_CUSTOMER,
                                                    new Object [] {
                                                      customer.getId()},
                                                    new RowMapper() {
            public Object mapRow(ResultSet rs, int arg1) throws SQLException {
                AccountExecutive result = new AccountExecutive();

                result.setFirstName(rs.getString("firstName"));
                result.setLastName(rs.getString("lastName"));
                result.setId(rs.getLong("id"));

                return result;
            }
        });
    }
}

As you did in the CustomerDaoImpl in Listing 8-19, the AccountExecutiveDaoImpl queries the database using Spring's JdbcTemplate. Using Spring's RowMapper facilities, you are able to map the results of the query to the new AccountExecutive object and return it to your ItemProcessor.

With your code written, you can wire up this job and see how it runs. The configuration for this job—including the two DAOs, the two ItemProcessors, one reader, one writer, the step, and the job—can all be found in Listing 8-21.

Example 8.21. Configuring a Step with a CompositeItemProcessor

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xmlns:util="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/batch
        http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">

    <beans:import resource="../launch-context.xml"/>

    <beans:bean id="customerFile" class="org.springframework.core.io.FileSystemResource" scope="step">
<beans:constructor-arg value="#{jobParameters[customerFile]}"/>
    </beans:bean>

    <beans:bean id="customerFileReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <beans:property name="resource" ref="customerFile" />
        <beans:property name="lineMapper">
            <beans:bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <beans:property name="lineTokenizer">
                    <beans:bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <beans:property name="names"
                            value="firstName,middleInitial,lastName,address,city,state,zip"/>
                        <beans:property name="delimiter" value=","/>
                    </beans:bean>
                </beans:property>
                <beans:property name="fieldSetMapper">
                    <beans:bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <beans:property name="prototypeBeanName" value="customer"/>
                    </beans:bean>
                </beans:property>
            </beans:bean>
        </beans:property>
    </beans:bean>

    <beans:bean id="customer" class="com.apress.springbatch.chapter8.Customer" scope="prototype"/>

    <beans:bean id="outputFile" class="org.springframework.core.io.FileSystemResource" scope="step">
        <beans:constructor-arg value="#{jobParameters[outputFile]}"/>
    </beans:bean>

    <beans:bean id="outputWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <beans:property name="resource" ref="outputFile" />
        <beans:property name="lineAggregator">
            <beans:bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
        </beans:property>
    </beans:bean>

    <beans:bean id="accountExecutiveDao"
        class="com.apress.springbatch.chapter8.AccountExecutiveDaoImpl">
        <beans:property name="dataSource" ref="dataSource"/>
    </beans:bean>

    <beans:bean id="customerDao" class="com.apress.springbatch.chapter8.CustomerDaoImpl">
        <beans:property name="dataSource" ref="dataSource"/>
    </beans:bean>
<beans:bean id="customerIdItemProcessor"
        class="com.apress.springbatch.chapter8.CustomerItemProcessor">
        <beans:property name="customerDao" ref="customerDao"/>
    </beans:bean>

    <beans:bean id="accountExecutiveItemProcessor"
        class="com.apress.springbatch.chapter8.AccountExecutiveItemProcessor">
        <beans:property name="accountExecutiveDao" ref="accountExecutiveDao"/>
    </beans:bean>

    <beans:bean id="completeItemProcessor"
        class="org.springframework.batch.item.support.CompositeItemProcessor">
        <beans:property name="delegates">
            <util:list>
                <beans:ref bean="customerIdItemProcessor"/>
                <beans:ref bean="accountExecutiveItemProcessor"/>
            </util:list>
        </beans:property>
    </beans:bean>

    <step id="copyFileStep">
        <tasklet>
            <chunk reader="customerFileReader" processor="completeItemProcessor" writer="outputWriter"
                commit-interval="10"/>
        </tasklet>
    </step>

    <job id="copyJob">
        <step id="step1" parent="copyFileStep"/>
    </job>
</beans:beans>

There is a lot of XML here, so let's start at the top. Beginning with the normal Spring configuration file imports and the inclusion of your normal launch-context.xml file, this file contains 10 uniquely configured beans (not including nested beans of the actual step or job). Table 8-1 walks through each of the beans configured in this file.

Table 8.1. Beans Configured for the CompositeItemProcessor Example

Bean

Description

Dependencies

customerFile

The input file to be read (the actual file name will be passed in as a parameter of the job).

None

customerFileReader

The FlatFileItemReader used for the step in this job.

• customerFile (the file to be read by this reader)

  

• A LineMapper implementation

Customer

The bean that the ItemReader will return for each record read.

None

outputFile

The output file to be written to (the actual file name will be passed in as a parameter of the job).

None

outputWriter

The FlatFileItemWriter used for the step in this job

• outputFile (the file to be written to by this writer)

.• A LineAggregator implementation.

accountExecutiveDao

The DAO implementation used to look upAccountExecutive's from the database.

A dataSource to be used by the JdbcTemplate.

customerDao

The DAO implementation used to look up Customers from the database.

A dataSource to be used by the JdbcTemplate

customerIdItemProcessor

The implementation of the ItemProcessor interface that you will use to populate the Customer's database ID field.

customerDao

accountExecutiveItemProcessor

The implementation of the ItemProcessor interface that you will use to associate a Customer with their AccountExecutive.

accountExecutiveDao

completeItemProcessor

This will execute each of the ItemProcessor implementations provided to it in order on the item it receives.

A list of ItemProcessor implementations to be executed in order.

copyFileStep

The Spring Batch step used to configure the ItemReader, ItemProcessor, and ItemWriter to be executed within a single transaction.

customerFileReader, completeItemProcessor, and outputWriter.

The configuration for even a simple CompositeItemProcessor job is not short. However, the amount of code you need to develop—and even more importantly the amount of code you need to write that depends on Spring Batch—is minimal to none[23].

By executing this job, Spring Batch will read in each of your customer records into Customer objects, apply the logic of both ItemProcessors, and write out the Customer object to your output file for each record. An example of the output generated by this batch job can be found in Listing 8-22.

Example 8.22. Sample Output of the CompositeItemProcessor Example

Richard N Darrow lives at 5570 Isabella Ave St. Louis, IL 58540 and has Manuel Castro as their account exec
Warren L Darrow lives at 4686 Mt. Lee Drive St. Louis, NY 94935 and has Manuel Castro as their account exec
Ann Z Benes lives at 2447 S. Greenwood Ave Las Vegas, NY 55366 and has Manuel Castro as their account exec
Laura S Johnson lives at 8177 4th Street Dallas, FL 04119 and has Manuel Castro as their account exec
Erica Z Gates lives at 3141 Farnam Street Omaha, CA 57640 and has Manuel Castro as their account exec
Harry T Darrow lives at 3273 Isabella Ave Houston, FL 97261 and has Anita Jordan as their account exec

The CompositeItemProcessor allows you to apply multiple flows of logic to each item within a transaction. This approach gives you the opportunity to keep your logical concerns separate for maintainability and reuse.

In the next section, you will look at writing your own ItemProcessor to filter items from the ItemWriter. Although you have written your own ItemProcessors in this section, you have passed all of the records you received to the writer up to this point. In the next section, you will look at how to change that.

Writing Your Own ItemProcessor

The ItemProcessor is really the easiest piece of the Spring Batch framework to implement yourself. This is by design. Input and output is standard across environments and business cases. Reading a file is the same regardless of if it contains financial data or scientific data. Writing to a database works the same regardless of what the object looks like. However, the ItemProcessor is where the business logic of your process exists. Because of this, you will virtually always need to create custom implementations of them. In this section, you will look at how to create a custom ItemProcessor implementation that filters certain items that were read from begin written.

Filtering Items

In the previous section, you created two of your own ItemProcessors: a CustomerItemProcessor that updated the item it received with the corresponding database ID and the AccountExecutiveItemProcessor that associates the customer's AccountExecutive with the Customer item so that information about the customer and the account executive can be written in the output file.

However, you didn't do a good job with error handling in the previous example. What happens if the Customer is not found and the ID is not updated in the CustomerItemProcessor? In this scenario, you probably want to filter the item out so the job does not try the account executive lookup. So how do you tell Spring Batch not to process the item anymore?

Spring Batch has you covered. It is actually very easy to tell Spring Batch not to continue processing an item. To do so, instead of the ItemProcessor returning an item, it returns null. So in this case, if you can't find a customer in your database, you will want the CustomerItemProcessor to return null so that the AccountExecutiveItemProcessor doesn't throw an exception by not having a Customer to look up by. The updated code for this is shown in Listing 8-23.

Example 8.23. CustomerItemProcessor that Handles Nonexistent Customers

package com.apress.springbatch.chapter8;

import org.springframework.batch.item.ItemProcessor;

public class CustomerItemProcessor implements ItemProcessor<Customer, Customer> {

    private CustomerDao customerDao;

    public Customer process(Customer customer) {
        Customer currentCustomer =
          customerDao.getCustomerByNameAndZip(customer.getFirstName(),
                                                 customer.getLastName(),
                                                 customer.getZip());
        if(currentCustomer != null) {
            customer.setId(currentCustomer.getId());
            return customer;
        } else {
            return null;
        }
    }

    public void setCustomerDao(CustomerDao customerDao) {
        this.customerDao = customerDao;
    }
}

With just this small change to your job, you can now run it without fear that the AccountExecutiveItemProcessor will fail because it doesn't have a customer number to look up. If you run this job with an input file that has customers that are not in the database, Spring Batch will keep track (as always) of the items read and written as well as the items filtered by your ItemProcessor. Looking at the results of your job via Spring Batch Admin in Figure 8-3, you can see just that.

Results from a job that filtered three items

Figure 8.3. Results from a job that filtered three items

In Chapter 4, you learned about skipping items, which used exceptions to identify records that were not to be processed. The difference between these two approaches is that this approach is intended for records that are technically valid records. Your customer had no records that the customerFileReader could not parse into an object. Instead, your business rules prevented you from being able to process this record so you decided to filter it out of the steps results.

Although a simple concept, ItemProcessors are a piece of the Spring Batch framework that any batch developer will spend large amounts of time in. This is where the business logic lives and is applied to the items being processed.

Summary

ItemProcessors are where business logic can be applied to the items being processed in your jobs. Spring Batch, instead of trying to help you, does what it should do for this piece of the framework: it gets out of your way and lets you determine how to apply the logic of your business as needed. In the next chapter, you will finish your look at the core components of Spring Batch by taking a deep dive into ItemWriters.



[21] Although Spring does have a Validator interface of its own, the ValidatingItemProcessor uses one from Spring Batch instead.

[22] The Spring Modules project was retired as of late 2010 in favor of the Spring Extensions project.

[23] You could have implemented the CustomerItemProcessor and the AccountExecutiveItemProcessors as services and used the ItemProcessorAdapter to reference them. This approach would isolate your code completely from the Spring Batch framework.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.17.27