Execution of the build() method is needed twice, because the first call builds the Job subflow initiated by the decider. The second build() call constructs the main Job instance. After we run this example, we can observe the same output as for the previous example (Listing 9-114).

Scaling and Parallel Processing

All the executions so far have been single-threaded. Sequential processing is often required by the nature of the data being processed, or simply isn’t slow enough to represent a bottleneck in an enterprise system.

But often data doesn’t have to be processed in sequence. In these cases, we can consider whether distributing the batch processing into separate threads or processes would boost overall performance. Fortunately, SB’s creators considered such use cases and implemented support for the following:

  • Multithreaded step (executed in a single process)
  • Parallel steps (executed in a single process)
  • Remote chunking of steps (executed in various processes)
  • Partitioning a step (executed in various processes)

Multithreaded Step

The idea of using multiple threads for one Step is applicable only to Chunk-oriented processing. Each chunk is processed in a separate thread. The threads are handled by a thread pool configured for Step.

XML Configuration of Multithreaded Step

This example reuses the common chunk-oriented reader, processor, and writer as well as the common BatchConfiguration and Spring Boot main class. The only new part is the XML configuration introduced in Listing 9-116.

Multiple threads are handled  by the TaskExecutor bean with a thread pool of size 10. When we define <tasklet> for our chunk-oriented step, we configure the taskExecutor bean via the task-executor attribute. Listing 9-117 shows the output when we execute this example.

Two facts are worth noticing. First, the items are processed in random order. Second, one chunk is handled in a single thread. If you pair the item numbers with the names of the threads (for example, simple record 4 processed, simple record 5 processed, simple record 6 processed, and simple record 7 processed were handled by the thread named taskExecutor-2), you can see that each chunk is processed in a separate thread. This allows us to tune the processing throughput according to our requirements.

Java Configuration of Multithreaded Step

The Java configuration is shown in Listing 9-118.

Configuring TaskExecutor for the chunk-oriented step is done by calling the taskExecutor() method while building the Step. The output for this example is similar to that of the previous example (Listing 9-117).

Parallel Steps

Not only chunks can be executed in parallel. Sometimes we might want to execute various steps in parallel.

XML Configuration of Parallel Steps

This example again reuses the common BatchConfiguration with the Spring Boot main class and all common chunk-oriented tasklets. We add one more, called AddSugar, which simulates the action named Add Sugar, similar to our common tasklets. Therefore, we show only the XML configuration in Listing 9-119.

The definition of parallel Steps starts with the <split> element named addIngredientsSplit. With task-executor, we define which thread pool will be used for parallel processing. Each <flow> subelement of <split> creates a boundary for execution in a single thread. In this case, each flow has only one step, so these two steps will be executed in parallel. Listing 9-120 shows the output of this example.

Notice that addTeaStep and addSugarStep are executed in separate threads in parallel. Figure 9-8 shows the batch graph.

9781484207949_Fig09-08.jpg

Figure 9-8. Bean graph of parallel steps example

Java Configuration of Parallel Steps

Again, we are building on the previous example. Listing 9-121 shows the Java configuration for the same behavior as in the previous example.

The definition of the thread pool bean and the creation of Step beans isn’t new for us. When we create the Job definition, the split() method allows us to specify the thread pool instance to use for parallel definition. In the add() method, we define parallel branches of execution. As use of split() and add() converts SimpleJob to FlowJob, we need to use the end() method to close the parallel subflow and the main job. The output after execution of this example is similar to Listing 9-120 shown previously.

Remote Chunking of Step

Remote chunking is a concept that can help when we are dealing with chunk-oriented processing and the processing or writing of items is resource intensive. The idea is to delegate the processing and writing part of the chunk to one remote JVM process or various processes. These processes can, of course, be hosted on remote machines.

Reading of chunks is done in the master process. The master communicates with the remote slave(s) via a message queue. So communication between master and slaves is asynchronous. The SB framework takes advantages of the Spring Integration framework to configure this communication. Two separate message queues are needed for the following:

  • Sending command from master to slave(s)
  • Sending responses from slave(s) back to master

As communication is done via the messaging interface, work is divided dynamically to remote slaves, and load balancing in case of various slaves is automatic. Let’s explain this natural load balancing. The master sends a command to the message queue, where various slaves are listening. As the message queue is configured for point-to-point messaging, only one slave can read this command. The first slave that picks up the message does the processing and writing. Naturally, the least busy slave will be the first to read this command message from the master.

SB uses the standard ItemReader, ItemProcessor, and ItemWriter interfaces for performing the work. ChunkProcessor<I> is the interface helping with work delegation to the slave process(es). SimpleChunkProcessor is the out-of-the-box implementation that the SB framework provides.

Notice that our example for this feature is covered only with an XML configuration. The example combines Spring Integration flow with SB flow, and XML configurations seem to be more concise in this case, so the Java configuration was skipped.

JMS Configuration Used for Master-Slave Communication

We will use a HornetQ server as the messaging middleware for this communication. Chapter 5 showed how to download and easily start a HornetQ server instance. Now we need to configure message queues for master-slave communication. Listing 9-122 shows the HornetQ definition of these queues.

We need to add these two top-level elements to the HornetQ configuration. After we start the server, the queues will be available. The queue named MasterCommands will be used for sending messages from the master to the slave. The SlaveResponses queue will be used for replies from the slave. Listing 9-123 shows the JMS communication, which will be present in the master and the slave projects.

The JMS connection via JNDI is the same as we used in the JMS examples as well as the JNDI lookup for connectionFactory bean. JNDI lookups for master-slave communication queues follow, which registers them as JMS destinations in the Spring context. Again, the same configuration is used in the master and slave project for JMS access.

Configuration of Slave for Remote Chunking

This example reuses our common writer and processor to work on the slave piece of remote chunking. Listing 9-124 shows the XML slave configuration.

First, we import the JMS configuration. Next we use a Spring Integration construct to define the inbound channel adapter, which will be receiving messages from the master. The items to process will arrive via the JMS queue masterCommands, and this inbound channel adapter will pace them onto the local Spring Integration channel inSlaveChannel, where they will be picked up by the service activator named slaveServiceActivator.

This service activator delegates items to process to ChunkProcessorChunkHandler, which is a wrapper for SimpleChunkProcessor. SimpleChunkProcessor is the bean where we define ItemProcessor and ItemReader—in this case, our common SimpleRecordProcessor and SimpleRecordWriter.

Responses from the simpleChunkHandler bean will be sent to the Spring Integration channel outSlaveChannel as a reply message from slaveServiceActivator. Last, outSlaveChannel is connected to outSlaveAdapter, which will send responses to the JMS queue slaveResponses.

Figure 9-9 shows the Spring Integration graph of processing messages from the master.

9781484207949_Fig09-09.jpg

Figure 9-9. Spring Integration graph of slave message flow

Configuration of Master for Remote Chunking

Listing 9-125 shows the remote chunking of the master configuration.

The JMS configuration is imported. Next we define the StepScope bean. When we use XML configuration, we need to enable batch scopes by configuring them as beans with proxyTargetClass enabled. If we didn’t configure these beans, Spring wouldn’t know about SB scopes at all.

The MessagingTemplate bean is used for communication with the slave application. Its defaultChannel is connected to outMasterAdapter, which is the JMS outbound channel adapter connected to the JMS queue masterCommands. This route is used for sending items for remote processing and writing.

Responses from the slave are received via the JMS inbound channel adapter inMasterAdapter and gathered in inMasterChannel, which is of type QueueChannel. So it stores responses in memory for master job processing.

The chunk-oriented processing Job is defined with the local reader bean simpleRecordWriter, but for a writer we use the special bean named remoteRecordWriter.

This is defined with the step scope and is of type ChunkMessageChannelItemWriter, which delegates writing of messages to messagineTemplate. The replyChannel attribute is configured to inMasterChannel. So this SB component polls inMasterChannel and waits for responses from the slave.

Running the Remote Chunking Example

To execute this example, we need to start three processes. First, the HornetQ server needs to be running; otherwise, the master and slave couldn’t communicate at all. We can start the HornetQ server via the command bin/run.sh or bin/run.bat.

The second process is started by the main class shown in Listing 9-126.

This imports the slave XML configuration, enables batch processing, and executes this configuration as a Spring Boot application. We need to execute the slave process first, because it will be listening for items to process from the master.

Finally, we execute the master process via the main class in Listing 9-127.

This configuration class imports the master XML configuration, enables batch processing, and runs this configuration as the Spring Boot main class. When we execute this third process, we see the output in Listing 9-128.

We can see that Job in the master process was executed and also finished successfully. But where are our write log entries? Notice that the remote chunking delegated the process and write work to the slave process. So when we switch to slave log entries, we can see the output in Listing 9-129.

Notice that if we wanted to use various slave processes/machines, we would run all the slaves first. The master should be executed last in this example.

Partitioning a Step

Step partitioning is an SB feature that delegates the batch processing load to remote processes completely. It also applies only to chunk-oriented processing, where processing is fully done in slave processes. This mechanism is handy when we are dealing with I/O bound requirements.

We also have to emphasize that an inner process mode of partitioning is possible; work is delegated to local threads instead of remote processes. But this execution is handled in a single process and can be easily covered by the parallel steps feature. Parallel processing is easier to configure. Therefore, we will focus purely on partitioning remote processes.

The idea is to divide the reading data set into partitions so that each partition can be executed in a separate remote process. We again have master and slave processes, and the master defines partitions. Partition boundaries are usually defined by one dimension of the data set. For example, if we have a data set of user interactions, we can divide this data set based on the interaction date and time into various time buckets. Data belonging to each bucket can represent the data set for one partition.

For each partition, the master creates a separate ExecutionContext instance in which the partition’s boundary information is stored. This is done in the component defined by the Partitioner interface. Most of the time, this component is implemented by application developers.

After the partitions are defined and wrapped into ExecutionContext, the master sends commands via the messaging interface to the slave processes. The slave process will retrieve information about partition boundaries from the ExecutionContext instance it receives. The slave then performs chunk-oriented processing on the defined partition data set, and after the work is done, sends the results to the master.

For handling partitions, SB provides three implementations of PartitionHandler:

  • TaskExecutorPartitionHandler: Uses TaskExecutor for delegating work to the slave’s local threads.
  • JsrPartitionHandler: Uses ThreadPoolTaskExecutor for delegating work to the slave’s local threads. Conforms to the JSR-352 partitioning standard.
  • MessageChannelPartitionHandler: This is the most interesting implementation in terms of true distributed scaling, where MessagingTemplate is used to delegate work to the slave’s remote processes. This scaling is again dependent on the least busy slave reading the message from the master first.

If we are using MessageChannelPartitionHandler for remote partitioning, the master and slaves need to share a database for storing SB metadata. Thi s DB instance should be used by the master and all slave processes directly via JDBC.

Partitioning Step Example

The example for this chapter uses only an XML configuration, for the same reasons as the remote chunking examples. To cover this feature, we will use three projects and will run several processes:

  • HornetQ server for handling messaging between master and slaves
  • Shared database server
  • Master project
  • Slave project, where we will run various processes

Shared Database Project

For a shared database, we will use the Spring Boot application running an instance of an H2 database stored in the file. Listing 9-130 shows the script used when we start this project.

Because we are not using H2 mode where it persists into the file, this script wipes out all the objects in the database so that we always start our example with a clean sheet. In the real world, we wouldn’t want to do this, but for example purposes, it is good to start with an empty DB. Listing 9-131 shows the XML configuration for this example project.

The first bean creates an H2 database instance, which is exposed via TCP and allows other processes to connect to it. The TCP port used for the DB connection is configured to 8043. The last configuration flag specifies the location of the file the H2 DB instance is using to store the data.

The XML tag <jdbc:initialize-database> will run the h2-purge.sql script from Listing 9-130, which ensures a clean sheet in the database after we start this project. The dataSource bean is needed to execute this script. The URL uses an H2 convention to access the TCP-hosted database located in the file ~/partition-test. The tilde character (~) indicates the user’s home directory.

Listing 9-132 shows the main class of this project.

This imports the XML configuration we described previously in Listing 9-131. When Spring Boot, executed with autoconfiguration, finds the H2 database dependency on the classpath, by default it configures an in-memory database. But we want to avoid this feature, because we have our special TCP configuration that we want to expose to other processes also. So we exclude the Spring Boot DataSourceAutoConfiguration from autoconfiguration so it doesn’t conflict with our configuration.

Slave Partitioning Project

Listing 9-133 shows the repository we will use in our example.

This class represents a data set to process. As we will need to separate the data set into partitions, we allow reading the example records via indexes, so that each slave can index the record belonging to the partition it should process. Each record will be exposed via the getRecord() method. Listing 9-134 shows ItemReader for this slave project.

This item reader injects IndexedReadRepository to read from. It also injects the ExecutionContext instance via the @BeforeStep annotation. Notice that this ExecutionContext instance will be sent to this slave process from the master and defines the partition boundaries that this slave should process.

In this case, the partition boundaries are stored in the currentIndex, and partitionEnd properties are stored in ExecutionContext. On each read() execution, this reader will check whether we reached the end of the partition. If not, it will read the record and increase currentIndex in the execution context. If the reader returns null, it means the end of work for this slave.

This example uses the same JMS configuration from Listing 9-123 of the previous example, where we define access to the JMS destinations MasterCommand and SlaveResponses. These will be used for communication with the master. The slave Spring configuration is presented in Listing 9-135.

First, we import the JMS configuration, so that the slave process can access the JMS messaging server in order to receive commands from the master. The dataSource bean is used to access the shared database hosted by the example project 0940-remote-partitioning-sql-server.

Next, the StepExecutionRequestHandler bean is the MessagingEndpoint used to handle commands from the master process. These commands will arrive as a StepExecutionRequest instance, and this handler should respond to the master with a StepExecution instance. The jobExplorer property of this bean is created by Spring Boot autoconfiguration and is used to access the SB metadata stored in the shared database. The stepLocator property is a bean created by the next bean definition and is of type BeanFactoryStepLocator. It is used for scanning the Spring context for SB Step instances. Step located in this Spring context will be used to perform SB work.

This Step instance is defined by the <batch:step> XML tag and configured to use chunk-oriented processing for this slave. For the ItemReader, we use PartitionedRecordReader shown in the previous listing. The processor and writer instances are our common beans used across the chapter.

The following beans define JMS communication with the master. The commands are received from the JMS queue masterCommands and forwarded to the local Spring Integration channel inSlaveChannel. This channel is connected to stepExecutionRequestHandler, which performs batch processing. Its reply is forwarded to Spring Integration’s outSlaveChannel and passed to the JMS queue slaveResponses, respectively.

Figure 9-10 shows the Spring Integration graph for this slave configuration.

9781484207949_Fig09-10.jpg

Figure 9-10. Spring Integration graph for slave configuration of remote partitioning example (file batch-slave-configuration.xml in folder src/main/resource of example project 0940-remote-partitioning-slave)

Listing 9-136 shows the main class of the slave project.

We include the slave XML configuration and run the Spring Boot application with batch processing enabled.

Master Partitioning Project

Listing 9-137 shows the Partitioner implementation.

This SB component divides partitions into separate ExecutionContext instances, based on the given gridSize (number of partitions). Each ExecutionContext instance contains a start and end index of the partition it belongs to. The size of the data set (records to process) is hard-coded to 16, because IndexedReadRepository for this example has only 16 records to process.We hope you excuse this fact, which is created to make this example simple. So, for example, with a gridSize of 2 partitions, we would create two ExecutionContexts with the partition boundaries 0..7 and 8..15.

Image Note  This algorithm as well as other constructs in this partitioning example are highly influenced by Michael Minella’s remote partitioning example at https://github.com/mminella/Spring-Batch-Talk-2.0.

Listing 9-138 shows the XML configuration of the master.

The master configuration also imports the JMS configuration to be able to send commands and receive responses from slaves. The dataSource bean accesses a shared database hosted in the 0940-remote-partitioning-sql-server example project.

The SB job definition follows. For Step, we use the <batch:partition> construct to define the remote execution for this processing. We need to define two properties here. A reference to the partitioner is covered by our work distribution algorithm in RecordsPartitioner from the previous listing.

The second bean used in the job is partitionHandler. This is the main bean of the master processing of type MessageChannelPartitionHandler, because it coordinates the slave processing via messaging middleware. We define gridSize (the number of partitions to process), stepName (the name of the step to execute in the slave processes), and replyChannel (the Spring Integration channel where the master expects replies from the slave processes). The last parameter of partitionHandler specifies the MessagingTemplate instance that will be used for communication with slaves.

This communication uses Spring Integration’s JMS outbound channel adapter outMasterAdapter connected to the JMS queue master commands. Responses from slaves are received from the JMS queue slaveResponses, temporarily stored in the Spring Integration QueueChannel named inMasterAggregatedChannel. So this channel is asynchronous. The last component is a Spring Integration aggregator using, again, partitionHandler to aggregate slave responses.

Listing 9-139 shows the main class of the master application.

Alongside importing the master Spring configuration, this Spring Boot main class needs to enable batch processing and enable scheduling for asynchronous polling of inMasterAggregatedChannel by the partitionHandler bean.

Executing Remote Partitioning Example

It is obvious that this last example is the most complicated in the entire book. When we execute it, the load balancing between slaves is natural; the least busy slave will read the messages from the masterCommands JMS queue first. So this least busy slave will perform the processing. In real life, this load balancing is fine.

But my machine has four cores, and when I use two slave processes with two partitions, they are mostly processed by the same process, and the second slave lays there unused. Therefore, we will run three slave processes to increase the possibility of dividing the load into separate slave processes.

To execute this example we need to do the following:

  • Start a HornetQ server with the same configuration as in the previous example (Listing 9-122).
  • Start a SQL server via the SqlServerApplication class from the example project 0940-remote-partitioning-sql-server.
  • Start three slave processes via BatchSlaveApplication from example project 0940-remote-partitioning-slave. Just run the same project three times.
  • Start the master process via BatchConfiguration from example project 0940-remote-partitioning-master.

After running this sequence, we can observe master process output similar to Listing 9-140.

The job completed successfully, with partitions split as we expected. Listing 9-141 shows the output of the first slave.

Listing 9-142 shows the output of the second slave.

As we can see, processing was successfully distributed across the various slave processes.

Spring Batch Admin

Often we need to monitor and manually control batch processing. For example, we might need to restart a certain JobExecution, because previously corrupted data was manually fixed. SB provides for this purpose a project called Spring Batch Admin (http://docs.spring.io/spring-batch-admin).

This is a web application that can be embedded into our batch processing application to control and monitor batch processing. Because it is a web component, it needs to be hosted in the servlets container. We can use spring-batch-admin-manager and spring-batch-admin-resources and place our Jobs XML configuration into the directory META-INF/spring/batch/jobs.

To cover the features of Spring Batch Admin, we will use an example application that can be downloaded from the Spring Batch Admin site (http://docs.spring.io/downloads/nightly/release-download.php?project=BATCHADM). We used version 1.3.1.RELEASE in this example. When we run this application on a local servlet container with port 8080 (for example, using the default Pivotal tc Server in STS), we can enter the http://localhost:8080/spring-batch-admin-sample address into the browser to control batch processing.

Image Note  All of the following figures were downloaded from the spring.io site at http://docs.spring.io/spring-batch-admin/screenshots.html.

Figure 9-11 shows how we can browse jobs that are available in the batch application, along with their main attributes and statistics. Figure 9-12 shows a screen for a single job.

9781484207949_Fig09-11.jpg

Figure 9-11. Jobs view of Spring Batch Admin

9781484207949_Fig09-12.jpg

Figure 9-12. Single job view of Spring Batch Admin

From this screen, we can run the job and specify the parameters for JobInstance. We can also see past or running job instances, including their status and main statistics.

Figure 9-13 shows the Job Executions screen.

9781484207949_Fig09-13.jpg

Figure 9-13. Job Executions screen of Spring Batch Admin

From this screen, we can observe the past and current JobExecution instances and their statuses and statistics. We can also terminate all executions by clicking the Stop All button.

Figure 9-14 shows the screen for a single JobExecution.

9781484207949_Fig09-14.jpg

Figure 9-14. Single JobExecution without error screen of Spring Batch Admin

On this screen, we can see statistics for a concrete JobExecution instance. We also can terminate the instance by clicking the Stop button.

Figure 9-15 shows the screen of a single JobExecution with an error.

9781484207949_Fig09-15.jpg

Figure 9-15. Single JobExecution with error screen of Spring Batch Admin

Here Spring Batch Admin can provide us a stack trace of the error that occurred in JobExecution.

Figure 9-16 shows a screen of the chunk-oriented StepExecution. Here we can see all the stats for StepExecution of the chunk-oriented processing Step.

9781484207949_Fig09-16.jpg

Figure 9-16. Chunk-oriented StepExecution screen of Spring Batch Admin

Summary

This chapter focused on background processing in the enterprise, where the Spring Batch framework has a unique position as one of the first abstractions in the Java world and a reference implementation for the JEE 7 standard for batch processing.

We covered the Spring Batch domain, which consists of Job and Step as its main pillars. Chunk-oriented processing is the most common mechanism for handling batch requirements, but we also have the option to use Tasklet, a one-time task to perform batching work.

Various options for running the job programmatically and from the command line were shown, alongside possibilities for executing Job with parameters. To allow for state sharing, we showed how to pass ExecutionContext between Spring Batch constructs. We also covered special Spring Batch scopes as an alternative to the ExecutionContext approach.

Next we explored the rich possibilities for intercepting processing of the Spring Batch framework and how to open/close/update third-party resources. Job and Step inheritance allows us to consolidate XML configuration for Spring Batch. The following sections dived into the mechanisms for controlling Spring Batch flow to repeat certain tasks, perform conditional Steps, and react to errors.

The remaining parts focused on scaling and parallel processing, so that we can boost our application’s performance.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.59.209.131