This is a Java-configured exact mirror of the SB flow XML configuration from the previous example (Listing 9-63). The main class of this example is also similar; we excluded the annotations @EnableBatchProcessing (because this annotation is located in BatchConfiguration) and @ImportResource (because in this case we don’t have any XML configuration files). The output of this example is again the same as in the previous example (Listing 9-65) and shows that batch scopes work as expected when transferring state.

Batch-Processing Listeners

Most of the Spring abstractions provide powerful mechanisms for their interceptions. SB is no exception, and we can intercept as follows:

  • Before and after Job with the JobExecutionListener interface or @BeforeJob and @AfterJob annotations
  • Before and after Step with the StepExecutionListener interface or @BeforeStep and @AfterStep annotations
  • Before, after, and after the chunk error with the ChunkListener interface or @BeforeChunk and @AfterChunk annotations
  • Before, after, and on item read error with the ItemReadListener<T> interface or @BeforeRead, @AfterRead, and @OnReadError annotations
  • Before, after, and on item process error with the ItemProcessListener<T, S> interface or @BeforeProcess, @AfterProcess, and @OnProcessError annotations
  • Before, after, and on items write error with the ItemWriteListener<S> interface or @BeforeWrite, @AfterWrite, and @OnWriteError annotations
  • On skip in read, process, and write via SkipListener<T,S> or @OnSkipInRead, @OnSkipInProcess, and @OnSkipInWrite (Skip with listener is covered later in this chapter.)
  • On error, open, close in retry mode via the Spring Core interface org.springframework.retry.RetryListener (Retry with listener is covered later in this chapter.)

If we don’t need to use all the intercept methods of a particular listener, SB also provides empty convenience implementations of listeners with the suffix Support. This way, we can implement only the interception methods we are interested in. For example, if we want to intercept only after a chunkoperation, we can implement ChunkListenerSupport and override only the afterChunk() method.

XML Configuration of Job Interception Example

Listing 9-69 shows the job listener.

We again use our simulation bean SimpleExecutablePoint to execute actions before and after the Job. In this case, we use the @BeforeJob and @AfterJob annotations to define the interception methods. Notice that we can pick names of interception methods, because annotations doesn’t restrict them. Listing 9-70 shows the XML batch configuration.

This XML configuration is similar to prepareTeaJob we’ve already seen in the previous example. In this case, we register the teaJobListener bean as a listener for the job. We also reuse the BatchConfiguration class from Listing 9-7, which includes the batch-config.xml file and turns on batch processing via the @EnableBatchProcessing annotation. The main class of this example is the common Spring Boot main class in Listing 9-8. Notice that these classes are used a lot in batch listener examples. After running this main class, we can see the output in Listing 9-71.

Java Configuration of Job Interception Example

Listing 9-72 shows a different style of Job implementation.

This listener implements the JobExecutionListener interface, so the interception methods have to conform to its contract. We can’t change the method names in this case. The jobExecution parameter is also enforced by the interface contract. This instance can be handy for accessing SB metadata. Listing 9-73 shows the Java configuration of the batch flow.

Again, this Java configuration is similar to that of prepareTeaJob in the previous examples. Notice how the listener is registered into the Job configuration while building its instance. The main class is again the common Spring Boot main class, and when we run it, we get the same output as for the previous example (Listing 9-71).

XML Configuration of Step Interception Example

Listing 9-74 shows the step listener implementation.

As we implement StepExecutionListener, we can’t change the names of the interception methods, because they need to match signatures from this interface. Signatures also include the parameter of StepExecution type. We don’t use it in our example, but it can be used to access SB metadata in listeners. Listing 9-75 shows the XML configuration where this listener is used.

Our listener is configured in the <step> tag and is reused for two steps. We did that to highlight fact that we can easily reuse listeners to intercept various SB steps. BatchConfiguration and the Spring Boot main class are again the same as for the previous XML examples. After running the main class, we get the output in Listing 9-76.

Both steps are intercepted as expected.

Java Configuration of Step Interception example

Listing 9-77 presents the StepExecutionListener implementation.

In this case, we use annotations to define the interception points. Notice that we don’t need to use StepExecution parameters and we are allowed to pick the interception method names. But if we needed to, we could inject any SB metadata instance(s) as parameter(s) into these interception methods.

The option to change Step ExitStatus in the @AfterStep method if needed is also useful. This mechanism can be handy if we want to change ExitStatus of the StepExecution in the listener. Listing 9-78 shows the Java batch configuration.

Notice that this is the exact Java configuration mirror of the XML configuration from the previous example in Listing 9-75. The main class is the same as the main Spring Boot class. After running it, we get the same output as in Listing 9-76.

XML Configuration of Chunk-Oriented Processing Listeners

Get ready for a listener-heavy example, as chunk-oriented processing can be intercepted quite extensively. Listing 9-79 shows the chunk listener.

The chunk can be intercepted before or after the chunk is read or after the chunk error occurs. Each time SB passes us an instance of ChunkContext. Because we are not using annotations, we need to name the interception methods according to the interface ChunkListener . Listing 9-80 shows the item reader listener.

We implement the ItemReadListener<String> interfa ce, which has three interception methods. Obviously, we don’t know what item we’re going to read out before we read. Therefore, the beforeRead() method doesn’t have any parameters. After reading, SB injects an item instance. If an error occurs, SB injects that an exception instance occurred. Listing 9-81 shows item process listener.

In a before interception, we get an instance of the item before processing via the parameter. In an after process interception, SB injects an instance of the processed/converted item and the result of the processing operation. In an after process error interce ption method, we get the item and exception, which occurred during its processing. Obviously, we can’t pick the names of methods, as we aren’t using annotations. Listing 9-82 shows the interceptor for the item writer.

The injections in the item writer are slightly different, as the writer is handling a List of items instead of items one by one. Otherwise, the injected values and interception points are similar to the reader and processor. Listing 9-83 shows the XML SB flow configuration.

All the listeners are registered as a  subtag of <chunk>. The order of listeners is not relevant in this case, as they are used for different processing phases. If we registered various listeners for the same batch-processing phase, they would be executed in the order that they were registered.

We again use BatchConfiguration and the common main Spring Boot class in this example. Listing 9-84 shows the output if we run the main class.

As you can see, this configuration-heavy example also is reflected in the output-heavy listing. But if you carefully step through it, you realize that the interceptions are happening as we expected.

Java Configuration of Chunk-Oriented Processing Listeners

Let’s skip the verbose listener implementations for this example. The listeners for Chunk, ItemReader, ItemProcessor, and ItemWriter don’t implement interfaces, but rather use annotations. The difference in this approach is that we are not forced to inject any parameters and are allowed to name interception methods differently. You can take a look at the 0922-chunk-listener-javaconfig example if you’re interested in how these listeners are defined. Listing 9-85 shows the Java configuration.

The number of injected beans for the Step definition is far beyond the horizon of a wise parameter count. But as we wanted to highlight how to configure all listener types, please ignore that in this case. The order of registering doesn’t matter, because listeners are for different phases of batch processing. If we registered various listeners for the same batch-processing phase, they would be executed in the order that they were registered.

The main class is again reused from previous examples, and when we run it, we get the same output as in the previous example (Listing 9-84).

ItemStream

Chunk-oriented processing often requires acquiring, updating, and releasing resources. Of course, Spring tries to handle these operations for us, for the most part. Examples can include opening and closing JMS or DB connections.

But for some technologies out there, Spring doesn’t cover opening and closing their resources for us. These resources are often in the form of a stream, so SB came up with the name ItemStream. If we need to open, update, or close such resources in chunk-oriented processing, we can rely on this SB feature. We can think of it as a special interceptor executed at the right time to open or close the resource.

This support comes in three forms:

  • ItemStream: Stand-alone interface defining open, update, and close actions as interception points. The advantage is that we can use this implementation separately from ItemReader and ItemWriter.
  • ItemStreamReader<T>: A convenience subinterface for ItemReader<T> and ItemStream. The advantage is to define the ItemReader as a stream-aware SB component out of the box.
  • ItemStreamWriter<S>: A convenience subinterface for ItemWriter<S> and ItemStream. The advantage is to define ItemWriter as a stream-aware SB component out of the box.

Notice that ItemStreamWriter<S> uses a different generic placeholder S than ItemStreamReader<T>. This is because we follow the notion of ItemProcessor<T, S>, where T is the type before the processing is passed into the processor, and S is the type after the processing is returned from the processor. Therefore, generic types after processing are marked with the placeholder S in this book.

XML Example of ItemStream Usage

Listing 9-86 shows a stand-alone implementation of the ItemStream interface.

With this interface, we need to implement all three possible actions needed against the stream. For simplicity, we don’t use a real resource, but rather simulate executions against our common execution point. In real life, we would open, update, or close the real resource. Listing 9-87 shows the ItemStreamWriter<T> implementation.

As the ItemStreamWriter<String> implementation combines ItemWriter<String> with the ItemStream interface, we need to implement the write(), open(), update(), and close() methods. To simulate the ItemStream life-cycle actions, we need to inject the SimpleExecutablePoint instance. To write items in the write() method, we need the WriteRepository instance. Listing 9-88 shows the XML configuration of these stream handlers.

As a writer, we use the streamRecordWriter bean, which is aware of the resource and can handle its simulated life cycle. For other chunk handlers, we use the common simpleRecordReader and simpleRecordProcessor beans. The stand-alone recordsReaderItemStream needs to be registered via the <streams> XML tag. We again use the common BatchConfiguration class to import this XML configuration into the component scanning and @EnableBatchProcessing. When we run this example with the common Spring Boot main class, we get the output in Listing 9-89.

The simulated resource open(), update(), and close() actions were handled for RecordsReaderItemStream and StreamRecordWriter as expected.

Java Configuration Example of ItemStream Usage

For a Java configuration example of ItemStream, we use all the same classes as for the previous example, except BatchConfiguration, as shown in Listing 9-90.

Notice that this is the mirror of the XML configuration from the previous example. Instead of the <stream> tag, we use the stream() method in step building. The output is the same as for the previous example (Listing 9-89).

Job and Step Inheritance

Sometimes we need to define a common configuration for Step or Job, which can be reused to define various Steps or Jobs. With a Java configuration, we use composition or inheritance for this purpose. But XML doesn’t provide an easy composition out of the box. Therefore, the SB XML configuration support introduced the concept of Step or Job inheritance.

This example reuses TeaJobListener from Listing 9-69. Listing 9-91 shows the XML configuration.

We define two jobs here. One prepares mild tea, and the other prepares strong tea. We also want to register the same listener for both Jobs, so the listener definition is an abstract parent job, prepareTeaJob. Similarly, we specify the abstract parent Steps, where we use the common prepare tea Tasklets. This way, we can define the child Steps based on the parent Steps configuration. Notice that we also reuse one parent in two steps of the same job.

We again use the common BatchConfiguration and the Spring Boot main class. After running it, we can observe the output in Listing 9-92.

Configuring Restart

When SB executes the Step with or without the same JobParameters and it completes with RepeatStatus.FINISHED, SB doesn’t allow it to start again. This is because the work represented by the Step is treated as done and shouldn’t be performed again by default. This behavior, of course, can be changed by setting the XML attribute allow-start-if-complete of <tasklet> to true or via the allowStartIfComplete(true) call, when we build Step. These steps might be useful for validating or releasing resources after processing.

We can also define the number of times that Step can be executed. This can be configured via the start-limit XML attribute of <tasklet> or via the startLimit() method during the Java-configured Step construction. This can be handy—for example, when we need to pre-populate the database with initial metadata, we may want to ensure that it will run only once with the start limit configured to 1.

We also can configure the restarting behavior of the Job. By default, Job is restartable. If we want to disable the restarting of the job, we need to set up false for the restartable attribute of the XML tag <job> or use the preventRestart() method while building Job.

XML Configuration Example of Step Restart

Listing 9-93 shows the XML configuration.

We have two Jobs configured here. The second one can’t be restarted. If we try to run this job twice, this attempt will throw an exception.

Both Jobs use steps inherited from the parent Steps that are using our common Tasklets. The first Step definition, boilWaterStepParent, can be restarted anytime we need. The second Step definition, addTeaStepParent, can be restarted twice. If we try to run it a third time, it will throw an exception and fail the JobExecution.

The last Step definition, addWaterStepParent, uses the default restarting behavior, which does not execute the Step logic but allows the JobExecution to succeed.

This example again uses the common BatchConfiguration. The main Spring Boot class is shown in Listing 9-94.

We run prepareTeaJob three times and prepareTeaJobNotRestartable twice to highlight the restart features. After we run it, we get the output in Listing 9-95.

The first execution prepares tea without any problems, as in the previous example. The second execution of prepareTeaJob finishes successfully, but the addWaterStep was already executed before, so SB doesn’t execute it again. This means that the second tea is without water. Notice that this is the default behavior for Step with the same parameters.

The third execution of prepareTeaJob results in FAILED JobExecution, because we allow addTeaStepParent to run only twice. When we execute prepareTeaJobNotRestartable, it runs fine the first time, but the second time it throws an exception into the main thread, because we don’t allow this job to run twice.

Java Configuration Example of Step Restart

Listing 9-96 shows the Java configuration of the same restarting behavior as in the previous example.

Allowing the restart and defining the restart limit is done while building Steps. Disabling Job from restarting is done via calling preventRestart(). This example also uses the common Spring Boot main class, and the output looks the same as for the previous example (Listing 9-95).

Control Repeat of Step Execution

At times we need to decide programmatically whether Step needs to be repeated. We can control this aspect based on RepeatStatus returned from Tasklet. Listing 9-97 shows the bean that will be driving the Step repetition logic.

As a Spring component with the @JobScope annotation, its instance will live only for one JobExecution. JobScope also allows us to use late binding and retrieve the job parameter with the name sugarAmount. We store it and initialize currentSugarCount to 0. Listing 9-98 shows the implementation of AddSugar step.

We inject our common SimpleExecutionPoint to simulate the action. The second injection includes SugarCounter that we defined in the previous listing. This bean is used to decide whether we add a spoonful of sugar to our tea and also whether we should repeat the job. If we return ReturnStatus.CONTINUABLE, this step will be executed again. When the logic returns RepeatStatus.FINISHED, this step won’t be executed again.

BatchConfiguration in this example is a simple flow of the tasklets BoilWater, AddTea, AddWater, and AddSugar. Because this configuration is straightforward and we’ve seen similar ones previously in this chapter, we’ll skip it. Listing 9-99 shows the main class of this example.

We run the prepareTeaJob and specify that we want two spoonfuls of sugar. We can specify any amount of sugar desired. After running it, we get the output in Listing 9-100.

Reacting to Failures

When batch processing contains steps, chunks, readers, processors, writers, and tasklets, often something goes wrong and the logic throws an exception. By default, SB fails the Step and the Job in such a situation. But some errors may be only temporary or not serious enough to fail all the processing. SB therefore provides various mechanisms to configure:

  • Skip the item and configure the org.springframework.batch.core.SkipListener<T,S> listener.
  • Retry the item and configure the org.springframework.retry.RetryListener listener. Notice that RetryListener is part of the common Spring module spring-retry.

SB also provides convenience abstract classes with the Support suffix for these interfaces (org.springframework.batch.core.SkipListenerSupport<T,S> and org.springframework.retry.RetryListenerSupport), where all methods have empty implementations. If we use abstract classes instead of interfaces, we don’t need to implement all the interception methods. These mechanisms apply only to chunk-oriented processing.

XML Configuration of Skip and SkipListener

This example uses error simulation in the processor and writer. An error in the reader would have a similar effect as an error in the processor. For a reader, we reuse our common reader. Listing 9-101 shows the processor simulating an error.

Before we process the first record, we simulate the error by throwing IllegalArgumentException. Subsequent processing attempts will succeed. Listing 9-102 shows the writer simulating an error.

This writer simulates an error twice and then succeeds. Listing 9-103 shows the SkipListener implementation.

When the read item is skipped, SB can inject its instance into the interception method. Therefore, it injects only an instance of Throwable as the root cause of the skip. But as we mentioned, there will be no error simulator in the reader for this example. We needed to implement this method because of the interface SkipListener<T, S>.

During the processor and writer interception, we also can use the instance of the item being skipped. SkipListener<T, S> uses two generic types, where T is the type before processing and S is the type after processing. In this case, both of these types are String, so the interface is used as SkipListener<String, String>. Listing 9-104 shows the XML configuration.

For a reader, we use our comm on SimpleRecordReader without error simulation. The processor and writer simulate errors as we’ve defined in Listings 9-101 and 9-102. The skip-limit attribute specifies the number of times we can skip each item before we fail the JobExecution. With the <skippable-exception-classes> tag, we define which exceptions are considered reasons to skip. SkipListener is registered in the standard <listeners> block, which is also used by all other listener types.

To define an exception list to skip, we can combine the <include> and <exclude> definitions of the exception types. This example did not fit into the scope of the book, and so is left for your experimentation.

BatchConfiguration and the main classes are again reused. Listing 9-105 shows the output.

As we can see, record 0 was skipped because of an error in the processor. But the writer behavior may be a surprise, because we simulated two errors in the writer. But when the write or whole chunk fails in chunk-oriented processing, SB tries to subsequently process each record one by one. This is because SB tries to write as many items as possible. This way, we are sure that one corrupted record won’t cause write failure for other valid records belonging to the same chunk.

In this case, the first failure in the writer  failed the whole chunk, so SB tried to write each record one by one afterward. The second error simulation failed record 1 processed. The rest of the records from the chunk, as well as all other chunks, were processed normally.

Java Configuration of Skip and SkipListener

This example uses the same classes as the previous one. The only difference is use of a Java configuration instead of XML. Listing 9-106 shows this configuration.

When we define the step, we again use the common reader and the error-simulating processor and writer. To configure the skip, we need to include the faultTolerant() call into the Step creation chain, so that we can configure skipLimit() and an exception to skip(). SkipListener is configured last, in the faultTolerant() section.

When we run this example, the behavior is exactly the same as for the previous example in Listing 9-105.

XML Configuration of Retry

This example uses common tasklets for the read and process items. The writer tasklet is reused from the previous example (Listing 9-102), simulating two errors for the first two write attempts. Listing 9-107 shows the retry listener.

We are interested only in the onError interception, so we extended org.springframework.retry.RetryListenerSupport as a convenience empty implementation of org.springframework.retry.RetryListener. If we implemented org.springframework.retry.RetryListener, we would need to also implement the close() and open() interception methods. Listing 9-108 shows the XML configuration.

We allow retrying each item three times, which is specified by the retry-limit attribute of <chunk>. If this retry limit is passed, an exception is thrown. StepExecution and JobExecution would fail in this case.

The <retryable-exception-classes> subelement defines a list of exceptions that we allow a retry for. The <retry-listeners> tag defines a list of retry listeners, which is in this case only our simpleRetryListener. Similar to the Skip feature, we can combine the <include> and <exclude> tags to fine-tune exceptions we are interested in. And as with the Skip example, we leave this feature for your experimentation.

BatchConfiguration and the main classes are our common implementations. After running this example, we can observe the output in Listing 9-109.

All the records were processed correctly, regardless of the two simulated errors, because we didn’t reach the retry limit.

Java Configuration of Retry

We will use the same classes for the Java configuration example of Retry and replace only the XML configuration by BatchConfiguration in Listing 9-110.

The retry configuration needs to be initiated by the faultTolerant() method, similar to Skip. With retry(), we define the exception that can be retried, which in this case is IllegalArgumentException. The retry limit is configured to 3, and we also configure simpleRetryListener as the retry listener. Job uses only simpleRecordsStep. When we run this example, we get the same output as for the previous example (Listing 9-109).

Conditional Execution of Step

SB provides features to programmatically decide whether Step needs to be executed. This programmatic decision is done in the bean implementing JobExecutionDecider, which is then used in the <decision> XML element or by combination of the methods next(), on(), to(), and from() when we are building a Job with a Java configuration.

XML Configuration of Step Decision

Listing 9-111 shows the implementation of JobExecutionDecider.

The component acting as JobExecutionDecider has to implement the decide() method, which has to return the FlowExecutionStatus type. We can use standard SB constants—for example, FlowExecutionStatus.COMPLETED or FlowExecutionStatus.FAILED. Notice that there are more out-of-the-box status constants in FlowExecutionStatus.

We can also wrap any string value into the FlowExecutionStatus type, which gives us an indefinite number of flow control opportunities in addition to the out-of-the-box status constants. In this case, we retrieve the teaIngredient job parameter from the injected jobExecution instance, and if milk is the desired ingredient, we indicate it in the return value; otherwise, the return status will be COMPLETED.

For this example we also introduce a new tasklet step called AddMilk. Because it is similar to all other tea tasklets, we skip its listing. It simulates the Add Milk action. Listing 9-112 shows the XML configuration.

The AddWater step defines teaIngredientDecision as the next Step. This decision block uses the TeaIngredientDecider bean as the decider logic. If the decider returns the milk value, we execute addMilkStep. If any other value is returned, we end the Job flow. Listing 9-113 shows the main class of this example.

The job prepareTeaStep is executed first with the milk parameter teaIngredient and the second time without any tea ingredient. After running this example, we can observe the output in Listing 9-114.

The first execution occurs with addMilkStep, and the second without it, depending on whether we pass milk as the teaIngredient parameter. It also would be interesting to take a look at batch graph of this example, shown in Figure 9-7.

9781484207949_Fig09-07.jpg

Figure 9-7. Batch graph for decision step

Java Configuration of Step Decision

Following our tradition, this Java example reuses all the classes from the previous XML-based example and replaces the XML configuration. Listing 9-115 shows the conditional Java-based configuration.

The definition of steps is straightforward; we use our common tea tasklets with the AddMilk tasklet and inject it into the Job bean construction method. This is where the interesting constructs start. addWaterStep goes to teaIngredientDecider.

The following execution path depends on the decider’s return value. If the decider returns the milk value, the branch with on("milk") is executed. It defines addMilkStep as the next step to execute. We also have to define the from() Step; otherwise, the Job will fail. If the decider returns FlowExecutionStatus.COMPLETED, the second branch of execution is applied. This value ends the execution of the batch flow.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.251.206