Chapter 13. Scaling and parallel processing

This chapter covers

  • Introducing scaling concepts
  • Deciding when and where to use scaling
  • Learning how to scale batch jobs
  • Exploring scaling patterns and techniques

Now that you have some real batch jobs under your belt, you can test them for performance in a development or testing environment. But what do you do when performance isn’t good enough?

You implement scaling and partitioning! Spring Batch provides several scaling techniques to improve performance without making code changes. You implement scaling by reconfiguring jobs, not changing code. For partitioning, you implement code to divide work between a master and slave nodes.

In this chapter, we discuss general scaling concepts for batch processing and, in particular, the Spring Batch model for scaling and partitioning. We look at the different ways to scale applications à la Spring Batch and describe various solutions. We finish with guidelines for choosing the most efficient techniques to improve the performance of your batch job.

13.1. Scaling concepts

Before tackling scaling in Spring Batch, we describe what scaling is and how it can generally help improve the performance of your applications. We then see how to apply scaling concepts in the context of batch jobs.

Spring Batch provides a scaling framework and various implementations to improve the performance of jobs and steps through configuration changes without modifying code.

13.1.1. Enhancing performance by scaling

Batch jobs are a bit particular with regard to scaling because they run in the background and don’t require user interaction. For this reason, measuring the response time for user requests isn’t an applicable performance metric. Batch jobs do have constraints on the time it takes to process an entire job. Batch applications usually run at night and have a limited time window to complete. The goal of scaling a batch job is to meet execution time requirements.

As with any application, you can tune the step and application algorithms. This is the first step to consider, but processing can still take too much time even after such improvements. You must then consider scaling your batch applications.

Scaling is the capability of a system to increase total throughput under an increased load when you add resources (typically hardware). You can consider several approaches to implement scaling for your applications:

  • Vertical scaling (scale up) Getting a bigger, better, and faster machine that hosts the application to reach the desired performance.
  • Horizontal scaling (scale out) Adding more processing nodes (or machines) to a system to handle load. This approach aims to distribute processing remotely on several nodes.

With vertical scaling, you work at the computer and system levels to achieve what is also called local scaling. Such an approach is particularly interesting if you want to leverage multicore or multiprocessor hardware, as illustrated in figure 13.1. Local scaling is suitable if processing implies a lot of I/O.

Figure 13.1. Vertical scaling (scaling up) migrates an application to more powerful hardware.

Horizontal scaling uses another approach by distributing processing over several nodes, as shown in figure 13.2. Each node supports a portion of the processing load. In this scenario, computers don’t necessarily need to be as powerful as in the vertical approach. Horizontal scaling commonly integrates mechanisms like load balancing, replication, and remote scaling.

Figure 13.2. Horizontal scaling splits application processing on different nodes and requires load balancing.

Horizontal scaling can leverage grid and cloud computing in order to implement remote processing.

This concludes our brief overview of scaling concepts. We now have a high-level view of the two techniques we use to improve batch job performance: horizontal and vertical scaling. Next, we see how to implement horizontal and vertical scaling with minimum impact on applications.

13.1.2. The Spring Batch scaling model

As described in the early chapters of this book, Spring Batch offers a generic framework to support batch job concepts. Job, Step, Tasklet, and Chunk are all domain objects in the batch job world. These types define a job and its parts. By default, Spring Batch executes all jobs sequentially.

Scaling in Spring Batch defines how to execute processing in parallel, locally, or on other machines. Scaling takes place mainly at the step level, and you can use different strategies to define at which level you want to split processing. You can choose to parallelize whole steps or only parts of their processing. You can also define datasets and process them in parallel locally or remotely. The best technique (or combination of techniques) is the one that allows your application to meet your performance expectations. Figure 13.3 depicts Spring Batch local scaling, and figure 13.4 shows Spring Batch remote scaling.

Figure 13.3. Local scaling in a single process executes batch job steps in parallel.

Figure 13.4. Remote scaling in more than one process executes batch job steps in parallel.

You can implement batch job scaling through configuration by using the Spring Batch XML vocabulary for multithreading and parallel step execution. For more advanced uses, you must configure steps with additional specialized objects.

Table 13.1 lists all scaling strategies provided by Spring Batch, shows if the strategy supports local or remote scaling, and describes its main feature.

Table 13.1. Scaling strategies provided by Spring Batch

Strategy

Local/Remote

Description

Multithreaded step Local A step is multithreaded.
Parallel step Local Executes steps in parallel using multithreading.
Remote chunking Remote Distributes chunk processing to remote nodes.
Partitioning step Local and remote Partitions data and splits up processing.

Before exploring each scaling strategy provided by Spring Batch, let’s look at the features of local and remote scaling and the use of the Spring task executor.

Local and Remote Scaling

As noted in table 13.1, Spring Batch supports both local and remote scaling. Implementing scaling on a single machine uses multithreading through the Spring task executor abstraction that we describe in the next section. Spring Batch natively supports this feature without any advanced configuration. When specified, multithreading is automatically performed when executing steps.

Remote scaling is more complex: it requires a remoting technology like Java Messaging Service (JMS) or GridGain, and you must plug in scaling to batch processing using Spring Batch hooks. This allows you to remotely execute a step or process a chunk. Remote scaling is more complex to configure and use but it provides higher scalability.

Spring Batch doesn’t provide implementations for remoting; it provides only the generic framework to plug in different service providers. The Spring Batch Admin module Spring Batch Integration aims to fill this void using Spring Integration facilities. We look at Spring Batch Integration in the remote chunking and partitioning sections.

The Spring Task Executor Abstraction

The Spring framework provides a Java 5–independent abstraction for using thread pools called the task executor. This abstraction is identical to the concept of the executor introduced in Java 5 and uses the same contract.

 

Concurrency and Java 5

Java 5 introduced the java.util.concurrent package, which includes classes commonly useful in concurrent programming. The package uses hardware-level constructs to allow efficient use of concurrency in Java programs without resorting to native code. The package provides classes and interfaces for collections (map, queue, list, and so on), executors (threads), synchronizers (semaphore), and timing.

 

The Spring task executor lets you execute a task according to a strategy by implementing the java.lang.Runnable interface. The following snippet lists the Spring TaskExecutor interface:

public interface TaskExecutor {
  void execute(Runnable task);
}

This interface is used internally by Spring and its portfolio projects, but it can also be used for your own needs. It specifies execution of Runnable code in a multithreaded environment. The implementation is responsible for implementing the appropriate strategy. The following snippet describes how to use the TaskExecutor interface in an application. The first line creates the task executor, and the last line executes the task:

TaskExecutor taskExecutor = createTaskExecutor();
for(int i = 0; i<25; i++) {
  String message = "Execution " + i);
  taskExecutor.execute(new SampleTask(message));
}

The following listing shows the SampleTask class that implements the Runnable interface and prints a message to the console from its run method.

Listing 13.1. Implementing the Runnable interface
public class SampleTask implements Runnable {
  private String message;

  public SampleTask(String message) {
    this.message = message;
  }

  public void run() {
    System.out.println(message);
  }
}

This technique simplifies multithreading usage in an application. It provides a simple contract and hides complexity in the implementations. Table 13.2 lists the main Spring TaskExecutor implementations.

Table 13.2. Main Spring TaskExecutor implementations

Implementation

Description

SimpleAsyncTaskExecutor Starts a new thread for each invocation. Supports a concurrency limit, which blocks any invocations that are over the limit until a slot is free.
ThreadPoolTaskExecutor Wraps and configures a Java 5 ThreadPoolExecutor class, which manages the thread pool.
WorkManagerTaskExecutor Wraps and configures a CommonJ WorkManager class, which provides support for executing concurrent tasks.

Each of these TaskExecutor implementations can be configured as a bean in the Spring configuration and injected in other Spring-powered plain old Java objects (POJOs). The following snippet describes how to configure a TheadPoolTaskExecutor. It first defines a task executor bean and then specifies task executor properties:

<bean id="taskExecutor"
  class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  <property name="corePoolSize" value="5"/>
  <property name="maxPoolSize" value="10"/>
  <property name="queueCapacity" value="25"/>
</bean>

The Spring TaskExecutor interface provides a uniform way to add concurrent processing to Spring applications. Spring Batch uses a TaskExecutor to enable multithreading in batch jobs. This feature is particularly useful to scale applications locally and enable parallel processing. Practically speaking, when scaling locally, you declare a TaskExecutor bean and plug it into Spring Batch.

Now that you know the core concepts behind scaling, let’s see the Spring Batch techniques for implementing it. For each technique, we describe its features, configuration, and when it applies. We start by adding multithreading to an application.

13.2. Multithreaded steps

By default, Spring Batch uses the same thread to execute a batch job from start to finish, meaning that everything runs sequentially. Spring Batch also allows multithreading at the step level. This makes it possible to process chunks using several threads.

 

Spring Batch entities and thread safety

Make sure you check the documentation of the readers and writers you use before configuring a step for multithreading. Most of the built-in Spring Batch readers and writers aren’t thread-safe and therefore are unsafe for use in a multithreaded step. If the Javadoc for a class doesn’t document thread safety, you need to look at the implementation to determine thread safety and make sure the code is stateless. You can still work with thread-safe (stateless) readers and writers; see the Spring Batch parallelJobs example, which demonstrates using a progress indicator for reading items from a database.

 

You can use multithreading to avoid waiting on one object (reader, processor, or writer) to finish processing one chunk in order to process another. Reading, processing, and writing chucks can take place in separate execution threads. This technique may not improve performance and is useful only if multithreading is supported by the hardware. Your mileage may vary. For example, performance wouldn’t increase on a machine with one processor core and a job doing a huge amount of processing, but the technique would be more efficient for a job performing a lot of I/O.

Figure 13.5 illustrates how a step handles reading and writing using multiple threads.

One consequence of this approach is that the step doesn’t necessarily process items in order. There’s no guarantee as to the item processing order, so you should consider the order random or undefined. We look at this aspect of multithreading later in this section with an example.

Figure 13.5. A step reading and writing using multiple threads

We’re done with multithreaded step concepts, so let’s dive into configuration and usage.

13.2.1. Configuring a multithreaded step

Configuring a multithreaded step in Spring Batch is simple because it involves only specifying a task executor for the step’s tasklet. Spring Batch then automatically enables multithreading for the step and uses the task executor to process chunks.

The following listing describes how to configure and add multithreading to our readWriteProductsStep used to import products. For this example, we rename it readWriteProductsMultiThreadedStep.

Listing 13.2. Configuring a multithreaded step
<batch:job id="importProductsMultiThreadedJob">
  <batch:step id="readWriteProductsMultiThreadedStep">
    <batch:tasklet task-executor="taskExecutor">
      <batch:chunk reader="reader" writer="writer" commit-interval="10"/>
    </batch:tasklet>
  </batch:step>
</batch:job>

<bean id="taskExecutor"
  class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  <property name="corePoolSize" value="5"/>
  <property name="maxPoolSize" value="5"/>
</bean>

The XML tasklet element sets the taskexecutor attribute, which is used to specify a TaskExecutor implementation configured as a Spring bean. Using this attribute automatically enables multithreading for the step.

Because understanding what happens when multithreading is involved is a bit difficult, let’s see how it works by running an import of 100 products. You add trace statements in the reader and writer to see which thread executes read and write operations. The following listing shows a portion of the console output.

Listing 13.3. Console output when importing products using threads
(...)
thread #5 – read product with product id #51
thread #5 – read product with product id #52
thread #5 – read product with product id #53
thread #3 – read product with product id #54
thread #5 – read product with product id #55
thread #3 – read product with product id #56
thread #5 – read product with product id #57
thread #3 – read product with product id #58
thread #5 – read product with product id #59
thread #3 – read product with product id #60
thread #5 – read product with product id #61
thread #3 – read product with product id #62
thread #5 – read product with product id #63
thread #3 – read product with product id #64
thread #5 – read product with product id #65
thread #3 – read product with product id #66
thread #5 – read product with product id #67
thread #3 – read product with product id #68
thread #3 – read product with product id #69
thread #5 – write products with product ids #51, #52, #53,
    #55, #57, #59, #61, #63, #65, #67
thread #3 – read product with product id #70
thread #3 – write products with product ids #54, #56, #58,
    #60, #62, #64, #66, #68, #69, #70
(...)

Listing 13.3 shows items processed in separate execution threads. The main consequence of this approach is that Spring Batch doesn’t read items sequentially; chunks may contain items that aren’t consecutive because threads read input data progressively and concurrently. Each thread builds its own chunk using a reader and passes this chunk to the writer. When a thread reaches the commit interval for a chunk, Spring Batch creates a new chunk. Because you’re using stock Spring Batch readers and writers that aren’t thread-safe, you must read, process, and write items from the same thread. Furthermore, out-of-order item processing must be supported for the application if you want to use this technique. Listing 13.3 also shows that each chunk built by a reader on a thread contains the number of items specified in the commit interval, except for the last items.

When configured for multithreading, the tasklet element also accepts an additional attribute called the throttle-limit. This attribute configures the level of thread concurrency and has a default value of 6. This is particularly useful to ensure that Spring Batch fully utilizes the thread pool. You must check that this value is consistent with other pooling resources such as a data source or thread pool. A thread pool might prevent the throttle limit from being reached. Ensure the core pool size is larger than this limit.

The following listing uses the throttle-limit attribute to configure a multithreaded step.

Listing 13.4. Setting the throttle limit of a multithreaded step

This approach is particularly interesting to get several threads to process chunks in parallel and save execution time. Multithreading also has its drawbacks, because it implies concurrent access of readers, processors, and writers. Such issues can be problematic when the implementations aren’t thread-safe. The next section focuses on these multithreading issues.

13.2.2. Multithreading issues

Spring Batch frees you from thread management in your code, but the nature of operating in a multithreaded environment means that you must be aware of its limitations and requirements. This is a similar situation as with Java EE environments and servlets. All objects shared by threads must be thread-safe to insure correct behavior. The bad news here is that most Spring Batch readers and writers aren’t thread-safe. We call such objects stateful.

The most problematic classes regarding thread safety in Spring Batch are ItemReader implementations because they commonly manage the state of processed data to make jobs restartable. To understand this better, take the example of a non-thread-safe ItemReader implementation, the JdbcCursorItemReader class. This class uses a JDBC ResultSet to read data and carries no thread-safety guarantee. For this reason and because the class doesn’t implement concurrency management, you can’t use it from multiple threads.

 

Thread safety

Thread safety describes the behavior of code when several threads access it concurrently. We say code (like a class) is thread-safe if you can use it in a multithreaded environment and it still produces correct results. This mainly means that conflicts don’t arise when using its static and instance variables. Accessing static and instance variables from several threads can cause problems, so this type of code usually isn’t thread-safe.

Such issues can also create additional problems during concurrent accesses of methods that use static variables without multithreading support.

Instance variables aren’t free from concurrent access problems either. If one thread sets an instance variable, it can cause problems for another thread reading or writing it.

Classes can support multithreading using facilities provided by the Java platform, such as the synchronized keyword, the ThreadLocal class, and the Java 5 java.util.concurrent package.

In general, insuring thread safety is challenging.

 

Implementing a Thread-Safe Item Reader

We have solutions to work around these thread safety issues. The first one is to implement a synchronizing delegator for the ItemReader interface that adds the synchronized keyword to the read method. Reading is usually cheaper than writing, so synchronizing the reading isn’t that bad: one thread reads (quickly) a chunk and hands it off to another thread that handles the (time-consuming) writing. The writing thread is busy for a while, at least long enough for the reading thread to read another chunk and for another thread to write the new chunk. To summarize, threads won’t fight for reading, because they’re busy writing. The following listing shows how to implement a synchronized reader.

Listing 13.5. Implementation of a synchronized reader

First, you mark your product item reader’s read method with the synchronized keyword and delegate processing to the delegate item reader. Because the target reader can potentially implement the ItemStream interface to manage state, you also need to implement this interface and delegate to its corresponding methods .

Another solution is to add finer synchronization to processing and handle state yourself. After adding the synchronize keyword to the reader, you deactivate the Spring Batch step state management. You configure the ItemReader bean with the saveState attribute for built-in Spring Batch item readers. For custom implementations, you implement the update method from the ItemStream interface to do nothing if the class implements the ItemReader interface. Because you manage state yourself, you can restart the job.

Implementing the Process Indicator Pattern

Let’s implement a thread-safe reader that applies the process indicator pattern. To apply this pattern, you add a dedicated column to the input data table to track processed products. For our use case, you use a column called processed from the Product table as the process indicator. The first step is to implement a thread-safe item reader. To do that, you reuse the SynchronizingItemReader class described in the previous section. The target item reader manages state on its own. In this simple scenario, the item writer sets the processed indicator flag to true after writing the item, as shown in figure 13.6.

Figure 13.6. Implementation of the process indicator pattern in a step

The following listing describes how to make a JdbcCursorItemReader thread-safe and configure it to manage state.

Listing 13.6. Configuring a thread-safe JdbcCursorItemReader with an indicator
<bean id="productItemReader"
      class="com.manning.sbia.ch13.SynchronizingItemReader">
  <property name="delegate" ref="targetProductItemReader"/>
</bean>

<bean id="targetProductItemReader"
      class="org.springframework.batch.item.database.JdbcCursorItemReader">
  <property name="dataSource" ref="dataSource"/>
  <property name="sql"
            value="select id, name, description, price
                        from product where processed=false"/>
  <property name="saveState" value="false"/>
  <property name="rowMapper" ref="productRowMapper"/>
</bean>

(...)

You start by configuring a SynchronizingItemReader bean to make the delegate item reader thread-safe. The synchronized item reader uses the delegate property to reference the delegate item reader. You then use the processed indicator column in the SQL statement to read data. A processed value of false causes the database to return only unprocessed rows. Finally, you disable Spring Batch state management. This is the other requirement to make the item reader thread-safe (with the synchronization of the read method). But by doing that, you lose the reader’s restartability feature, because the item reader won’t know where it left off after a failure. Luckily, the process indicator is there to enable restartability: the reader reads only unprocessed items.

The item writer then needs to flag the product as handled using the processed column and then write the item, as described in the following listing.

Listing 13.7. Implementing a JDBC ItemWriter with a SQL indicator

In the write method, each item in the loop is tagged as processed by setting the processed column to true. The item reader won’t process products with the processed column value set to true. This technique allows managing state and makes the job restartable.

Notice that the writer needs to extend a Spring Batch built-in writer like JdbcBatchItemWriter to specify processing and set the processed column to true when an item is processed. To go further and be nonintrusive, you add an item processor that manages the indicator column, as illustrated in figure 13.7.

Figure 13.7. The process indicator pattern for a step using an ItemProcessor

You can imagine that implementing your own state management is more difficult with files as input. A common technique is to import data from a file into a dedicated staging database table. The import is fast, even when not multithreaded. The job then bases its data processing on this staging database table, using a parallelized step and the process indicator pattern.

We’ve begun to parallelize the processing using multithreading and focusing on chunk processing. This is an interesting way to improve performance but holds limitations due to multithreading and thread-safety issues. Let’s turn our attention to a new technique that also uses multithreading to parallel processing, but at the step level, and eliminates these types of problems.

13.3. Parallelizing processing (single machine)

Based on the previous section, multithreading is far from good enough. We can now see that the key to scaling is to find a suitable technique to parallelize batch processing.

Spring Batch provides a convenient way to organize steps for parallel execution. Spring Batch XML supports this feature directly at the configuration level. The feature also relates to the job flow feature. We focus here on the capability of a job flow to split step processing. This aspect is useful for scaling batch jobs because it allows executing several steps in parallel, as illustrated in figure 13.8 where Spring Batch executes dedicated steps in parallel to process products, books, and mobile phones.

Figure 13.8. Executing steps in parallel using dedicated threads

A Spring Batch job can define a set of steps that execute in a specific order. In chapter 10, we configure Spring Batch with advanced flows to control which steps to execute and in what order. Spring Batch flow support provides the split element as a child of the job element. The split element specifies parallel execution of its containing steps.

13.3.1. Configuring parallel steps

Configuring steps for parallel execution is simple and natural in Spring Batch XML. In a split XML element, you add flows to define what to execute in parallel. These flows can contain a single step or several steps with a specific order. Because you can consider a split to be a step, it can have an identifier and be the target of the next attributes in steps. A split can also define a next attribute to specify what to execute after all flows in the split end. A split ends when all contained flows end.

The following listing describes how to organize the steps in our case study to read books and mobile products in parallel.

Listing 13.8. Configuring parallel steps to import products

Listing 13.8 defines a job with parallel steps named importProductsJob. After receiving and decompressing product files, you process the files in parallel that correspond to products for books and mobile phones. For this task, you define a split element with the identifier readWrite . This split defines two flows with a single step for each flow and for each product type . Once these two steps end, you call the step moveProcessedFiles.

As mentioned previously, using parallel steps implies multithreading. By default, parallel step execution uses a SyncTaskExecutor, but you can specify your own using the taskexecutor attribute on the split element, as described in the following listing.

Listing 13.9. Configuring a task executor

Our first two scaling techniques use multithreading to parallelize processing of chunks and steps where all processing executes on the same machine. For this reason, performance correlates to a machine’s capabilities. In the next section, we use techniques to process jobs remotely, providing a higher level of scalability. Let’s start with the remote chunking pattern, which executes chunks on several slave computers.

13.4. Remote chunking (multiple machines)

The previously described techniques aim to integrate concurrent and parallel processing in batch processing. This improves performance, but it may not be sufficient. A single machine will eventually hit a performance limit. Therefore, if performance still isn’t suitable, you can consider using multiple machines to handle processing.

In this section, we describe remote chunking, our first Spring Batch scaling technique for batch processing on multiple machines.

13.4.1. What is remote chunking?

Remote chunking separates data reading and processing between a master and multiple slave machines. The master machine reads and dispatches data to slave machines. The master machine reads data in a step and delegates chunk processing to slave machines through a remote communication mechanism like JMS. Figure 13.9 provides an overview of remote chunking, the actors involved, and where processing takes place.

Figure 13.9. Remote chunking with a master machine reading and dispatching data to slave machines for processing

Because the master is responsible for reading data, remote chunking is relevant only if reading isn’t a bottleneck.

As you can see in figure 13.9, Spring Batch implements remote chunking through two core interfaces respectively implemented on the master and slave machines:

  • ChunkProvider —Returns chunks from an item reader; it’s used by the ChunkOrientedTasklet.
  • ChunkProcessor —Handles item writing and processing.

The ChunkProvider interface is responsible for returning chunks from an ItemReader. Chunk processors can handle the chunks. By default, Spring Batch uses the SimpleChunkProvider implementation, which delegates to the read method of the item reader. The following snippet lists the ChunkProvider interface:

public interface ChunkProvider<T> {
  void postProcess(StepContribution contribution, Chunk<T> chunk);
  Chunk<T> provide(StepContribution contribution) throws Exception;
}

The ChunkProcessor interface receives the chunks and is responsible for processing them in its process method. By default, Spring Batch uses the SimpleChunkProcessor implementation, which handles basic item writing and processing. The following snippet lists the ChunkProcessor interface:

public interface ChunkProcessor<I> {
  void process(StepContribution contribution, Chunk<I> chunk)
   throws Exception;
}

Now that we know about the relevant mechanisms and actors used in remote chunking, it’s time for a concrete example. If you look for additional remote chunking support in the Spring Batch distribution, you find nothing more. Spring Batch only provides the extensible framework to make it possible to use such a mechanism in batch processing, but it doesn’t provide implementations. In addition to the remote chunking framework in the Spring Batch core, the Spring Batch Admin project provides a module called Spring Batch Integration that includes a Spring Integration–based extension for remote chunking. This module provides facilities to implement remoting in Spring Batch and remote chunking using Spring Integration channels.

13.4.2. Remote chunking with Spring Integration

The major challenge in implementing remote chunking is to make the master and its slaves communicate reliably to exchange chunks for processing. Spring Batch chose Spring Integration for its communication infrastructure because Spring Integration provides a message-driven, transport-independent framework. JMS is the obvious choice for communication because it’s asynchronous and provides guaranteed delivery. Nevertheless, Spring Integration wraps its use of JMS. This leaves the door open for supporting other messaging technologies, such as Advanced Message Queuing Protocol (AMQP).

 

Why does remote chunking need guaranteed delivery?

With remote chunking, a master node sends chunks to slave nodes for processing. You don’t want to lose these chunks in case of failure! That’s why reliable messaging technologies—like JMS or AMQP—are good candidates for remote chunking with Spring Batch.

 

The remote chunking implementation based on Spring Integration isn’t in the Spring Batch distribution itself, but you can find it in the Spring Batch Admin distribution.

Chapter 11 covers the basics of Spring Integration in a real-world enterprise integration scenario. If you’re in a hurry and are only interested in implementing remote chunking, you can move on directly to the next section, which describes remote chunking using channels.

Remote Chunking Using Channels

A messaging channel is a communication medium between two applications using a message-oriented middleware (MOM) system, as described in Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf (Addison-Wesley, 2004). On one end, the application writes data on the channel, and on the other end, the application reads data from the channel. The messaging middleware is responsible for delivering the data.

A channel is a great communication medium for remote chunking. It provides the abstraction to make communication between master and slaves independent from any technology for remotely processing chunks. Moreover, channels implement reliable messaging, ensuring that no message is lost. Figure 13.10 shows which mechanisms and entities are involved when implementing remote chunking with Spring Integration and Spring Batch.

Figure 13.10. Spring Integration–based implementation of remote chunking using a messaging gateway and a listener to communicate between master and slaves through channels

Because two types of actors—master and slave—are involved when implementing remote chunking, we successively describe the master and slave machine implementations. We focus here on how to configure these machines using Spring Integration and how to make them communicate. To keep things simple, we implement only one slave, but you can generalize this to several slaves. First, let’s look at the master.

Implementing the Master

In remote chunking, the master is responsible for reading input data and sending the corresponding chunks to slaves for processing. As shown in figure 13.10, you use the ChunkMessageChannelItemWriter class to exchange data using Spring Integration channels.

Because you use Spring Integration channels, you configure channels for requests, replies, and the messaging gateway. The gateway produces and consumes messages using channels. The following listing describes how to configure channels and the messaging gateway for a remote chunking master machine.

Listing 13.10. Configuring Spring Integration for a remote chunking master

You configure the messaging gateway to send and receive messages from the messaging middleware. The gateway uses channels for requests and replies that you configure using Spring Integration XML . Notice here the use of the channel adapter outbound-channel-adapter for the requests channel with a JMS outbound destination. To receive and handle messages from the reply destination, you define a thread-local-channel .

Now that you’ve configured your Spring Integration XML elements, let’s see how to define Spring Batch entities from the Spring Batch Integration module to implement remote chunking for the master. This configuration may seem a bit like magic. No entity mentioned in the introduction section appears in the configuration, and it’s difficult to see how the ChunkMessageChannelItemWriter bean is involved in the processing.

In fact, the RemoteChunkHandlerFactoryBean class is responsible for configuring the step for remote chunking. It automatically and transparently converts an existing chunk–oriented step into a remote chunk–oriented step for the master. To achieve this, the class replaces the current chunk processor with one that writes chunks to a message channel. The following listing describes how to configure a master for remote chunking.

Listing 13.11. Configuring a master for remote chunking
<bean id="chunkWriter"
      class="org.springframework.batch.integration.chunk
                        .ChunkMessageChannelItemWriter" scope="step">
  <property name="messagingGateway" ref="messagingGateway"/>
</bean>

<bean id="chunkHandler"
      class="org.springframework.batch.integration.chunk
                              .RemoteChunkHandlerFactoryBean">
  <property name="chunkWriter" ref="chunkWriter"/>
  <property name="step" ref="stepChunk"/>
</bean>

You start by configuring a ChunkMessageChannelItemWriter bean using the messaging gateway. Next, you configure the factory bean for the chunk handler using the RemoteChunkHandlerFactoryBean class. You set the chunkWriter property to the chunk channel writer, and then reference the step defined with the stepChunk ID using the step property. This step corresponds to the step implementing remote chunking for the batch job.

The RemoteChunkHandlerFactoryBean class creates a chunk handler, which makes it possible to configure a master as a slave to process chunks. In this case, you add a service activator bean using Spring Integration. We describe this in the next section.

You’ve configured the master to send chunks through a channel for remote processing. Next, let’s configure a slave.

Implementing a Slave

In remote chunking, slaves process chunks remotely and can send data back to the master. Slaves correspond to dedicated Spring applications that are channel listeners that receive messages, process content, and use the reply channel to notify the master.

At the slave level, you use more low-level objects because you communicate through JMS destinations, the underlying mechanism for channels. The service activator is a JMS message listener that triggers processing for the chunk handler. The following listing describes JMS listener definitions and the service activator configuration. The service activator references both input and output channels.

Listing 13.12. Configuring Spring Integration for a remote chunking slave
<!-- JMS listener container -->
<jms:listener-container
          connection-factory="connectionFactory"
          transaction-manager="transactionManager"
          acknowledge="transacted">
  <jms:listener destination="requests"
                response-destination="replies"
                ref="chunkHandler"
                method="handleChunk"/>
</jms:listener-container>

You use a message listener container to receive messages from a JMS message queue and drive it to a POJO defined as a listener. You set attributes on the listenercontainer element for a JMS connection factory, a transaction manager, and the acknowledgment type. The listener element specifies how to route messages to the chunk handler.

As shown in figure 13.10, the entry point for the listener on the slave side is a ChunkProcessorChunkHandler. The handler is responsible for triggering processing of the chunk processor for the received chunk. For this reason, you must configure a chunk processor in the handler. This handler knows how to distinguish between a processor that is fault tolerant, and one that is not. If the processor is fault tolerant, then exceptions can be propagated on the assumption that there will be a rollback and the request will be re-delivered.

The following listing describes how to configure a SimpleChunkProcessor in this context and set it on the item writer to execute.

Listing 13.13. Configuring a slave for remote chunking
<bean id="chunkHandler"
      class="org.springframework.batch.integration.chunk
                                 .ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean
     class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter"
                ref="itemWriter"/>
      <property name="itemProcessor">
        <bean class="org.springframework.batch.item
                             .support.PassThroughItemProcessor"/>
      </property>
    </bean>
  </ property>
</bean>

Configuring slaves requires defining the chunk handler that the listener calls when receiving messages from the requests destination. In Spring Integration, the handler is a ChunkProcessorChunkHandler bean that specifies a chunk processor used to handle the received chunk. Here, you use the SimpleChunkProcessor class with the target item writer to execute (the itemWriter attribute) and an item processor that does nothing (the itemProcessor attribute).

Third-party tools like GridGain[1] provide additional implementations for remote chunking. GridGain is an innovative Java and Scala–based cloud application platform, which you can use with the Spring Batch Integration module[2] for GridGain.

1www.gridgain.com/

2http://aloiscochard.blogspot.com/search/label/gridgain

In summary, remote chunking uses a master to send chunks to remote slaves for processing. In the next section, we explore a flexible scaling Spring Batch technique called partitioning.

13.5. Fine-grained scaling with partitioning

The last technique provided by Spring Batch for scaling is partitioning. You use partitioning to implement scaling in a finer-grained manner. At this level, you can also use multithreading and remoting techniques.

 

Note

Partitioning is arguably the most popular scaling strategy in Spring Batch: it’s simple to configure if you stick to the defaults and local implementation; and restart still works out of the box.

 

Figure 13.11 shows an overview of how partitioning works in Spring Batch and how it provides scaling. The figure shows that partitioning takes place at the step level and divides processing into two main parts:

  • Data partitioning —Creates step executions to process data. This splitting allows parallelizing and data processing. Data partitioning depends on the nature of the data: ranges of primary keys, the first letter of a product name, and so on. A batch developer would likely implement their own partitioning logic in a Partitioner. The Spring Batch Partitioner interface defines the contract for this feature.
  • Step execution handling —Specifies how to handle different step executions. It can be local with multithreading (or not) and even remote using technologies like Spring Integration. A framework typically provides the way to handle execution, and Spring Batch provides a multithreaded implementation. The Spring Batch PartitionHandler interface defines the contract for this feature.
Figure 13.11. Partitioning splits input data processing into several step executions processed on the master or remote slave machines.

We recommended this approach when you want to parallelize processing of data partitions and when you want to control how to create and handle these partitions.

Now that we’ve described the general concepts behind Spring Batch partitioning, it’s time to see how to implement and configure an example. This technique is configuration-centric and provides an open framework to integrate custom strategies.

 

Why doesn’t partitioning need guaranteed delivery?

Contrary to remote chunking, partitioning doesn’t need guaranteed delivery. With partitioning, Spring Batch handles each partition in its own step execution. On a restart after failure, Spring Batch re-creates the partitions and processes them again. Spring Batch doesn’t leave data unprocessed.

 

13.5.1. Configuring partitioning

As described previously, to implement partitioning, you must define the splitting and processing of data. Partitioning corresponds to creating several step executions for a step. You configure this example using Spring Batch XML.

To configure partitioning, you use the partition element instead of the tasklet element in the step configuration. The partition element partitions the target step configured using the step attribute, which eliminates any impact on implementations of step entities like readers, processors, and writers. It’s only a matter of configuration. Additional settings are also available to configure the partitioner and handler.

Listing 13.14 describes a basic partitioning configuration for the readWriteProducts step of our case study. The listing shows how to use the partition and handler elements and hides the configuration of additional entities like the partitioner. We focus on these later when we describe the Spring Batch partitioning Service Provider Interface (SPI) in section 13.5.2.

Listing 13.14. Configuring step partitioning
<batch:job id="importProducts">
  <batch:step id="readWriteProducts">
    <batch:partition step="partitionReadWriteProducts"
                     partitioner="partitioner">
      <batch:handler grid-size="2"
                     task-executor="taskExecutor"/>
    </batch:partition>
  </batch:step>
</batch:job>

<batch:step id="partitionReadWriteProducts">
  <batch:tasklet>
    <batch:chunk reader="reader" writer="writer" commit-interval="3"/>
  </batch:tasklet>
</batch:step>

<bean id="partitioner" (...)> (...) </bean>
<bean id="taskExecutor" (...)> (...) </bean>

The partitioning configuration is located in the step instead of in its declaration. The partition references this configuration with the step attribute. You set additional properties on the partition with the partitioner attribute and the handler inner element. The default value for the partition handler defined in the handler element is TaskExecutorPartitionHandler. The step is now defined independently using the step that contains standard elements like tasklet and chunk. Note that using the step attribute makes sense only for local partitioning, because it refers to a local step bean in the current Spring application context. In the case of remote partitioning—when the processing happens in a different process—referring to a local step bean doesn’t make sense. For remote partitioning, you usually set up the step name on the partition handler. The partition handler then sends the step name—a simple String—to a remote worker. The step name then refers to a step bean in another Spring application context.

The configuration schema provides the ability to use any handler implementation with the handler attribute in the partition element, as described in the following listing.

Listing 13.15. Configuring step partitioning with a partition handler

Before dealing with the partitioning SPI, we emphasize an important and interesting aspect of partitioning: late binding is available with this feature. The difference here is that late binding gives access to property values present in the current step execution.

To understand this better, let’s look at an example. If you split a step to handle each file in a multiresource reader separately, you can access the name of the current file using late binding, as described in the following snippet. Each partition sets the filename property for the step execution. Notice that the step is involved in a partitioned step:

<bean id="itemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
  <property name="resource" value="#{stepExecutionContext[fileName]}"/>
  (...)
</bean>

In this section, we described how to use partitioning in step configurations. We saw how easy implementation is and that there is no impact on the steps involved. Spring Batch provides an open framework for partitioning that allows defining and implementing advanced and custom solutions.

13.5.2. The partitioning SPI

It’s time to look at how things work under the hood and how to extend this support. Spring Batch provides a complete SPI for this purpose, using the interfaces listed in table 13.3.

Table 13.3. Partitioning SPI

Interface

Description

PartitionHandler Determines how to partition and handle input data. An implementation completely controls the execution of a partitioned StepExecution. It doesn’t know how partitioning is implemented and doesn’t manage the aggregation of results. The default implementation is the TaskExecutorPartitionHandler class.
StepExecutionSplitter A strategy interface for generating input execution contexts for a partitioned step execution. The strategy is independent from the partition handler that executes each step. By default, this interface delegates to a Partitioner. The default implementation is the SimpleStepExecutionSplitter class.
Partitioner Creates step executions for the partitioned step. The default implementation is the SimplePartitioner class, which creates empty step executions.

These interfaces and classes are involved when partitioning steps, as described earlier in listing 13.14. When Spring Batch executes a partitioned step, it invokes the partition handler to process the partition. The step then aggregates results and updates the step status.

The partition handler does the heavy lifting. It’s responsible for triggering partitioning based on the StepExecutionSplitter that creates the step executions. A splitter implementation like the SimpleStepExecutionSplitter class uses a Partitioner to get execution contexts for each step execution: the partitioner performs the splitting. Once the splitting is complete, the partition handler executes each step with a defined strategy. It can be local with multithreading or remote. As a batch job developer, you would typically write (or configure) only a Partitioner implementation to partition your data.

Figure 13.12 summarizes the objects involved during partitioning.

Figure 13.12. Partitioning SPI objects involved in partitioning and processing data for a partitioned step

It’s now time to tackle different ways of using partition handlers and partitioners. We begin with the default implementations provided by Spring Batch, describe how to use third-party implementations, and finish with custom implementations.

Using the Default Partition Handler

The partitioning classes give you the ability to scale steps on several processing nodes and enable strategies to increase performance. Partitioning is particularly useful to distribute step processing on several computers. This isn’t always necessary. In fact, the default Spring Batch PartitionHandler implementation uses multithreading to process steps.

Let’s take our case study as an example and use multithreading to import multiple product files concurrently. Section 13.2 describes how to add multithreading for the whole step, but this approach can’t control which thread processes which data. Partitioning provides this support by using dedicated threads to process all of the data for each file. Using the default PartitionHandler implementation, the TaskExecutorPartitionHandler class, makes this possible. Figure 13.13 illustrates the multithreaded aspect of this architecture.

Figure 13.13. Using dedicated threads to process data when importing product files with partitioning

Configuring this strategy is simple because it’s similar to the generic strategy described earlier. The difference is how to configure partitioning using XML. Spring Batch provides the MultiResourcePartitioner class to create a new step execution for each file to import. The following listing describes how to configure a MultiResourcePartitioner bean and how to use it on the partitioned step.

Listing 13.16. Configuring partitioning with a MultiResourcePartitioner

Configuring this strategy follows the same rules as for using the partition element . The step attribute specifies the step to partition, and the handler child element sets the partition handler . The partitioner attribute references a MultiResourcePartitioner bean that specifies a pattern for a file list in the resources property . It also sets the keyName property to specify the name of the current resource to use when adding a file in the step context attributes.

The last thing you must do is specify the resource file to process in the item reader using late binding. Partitioning is most powerful when each created step execution has its own parameter values. For the MultiResourcePartitioner class, the fileName context attribute is the resource name associated with the step execution. The following snippet describes how to specify the filename at the item reader level. Remember to specify the step scope when using late binding!

The previous sections describe how to use the Spring Batch implementations of the partitioning SPI. It’s also possible to use implementations from third-party tools like the Spring Batch Integration module or to implement custom classes.

Using Channel-Based Partitions

Implementing a PartitionHandler remains a tricky task and, unfortunately, the Spring Batch core doesn’t provide implementations other than the TaskExecutorPartitionHandler. As for remote chunking, the Spring Batch Integration module provides classes compatible with Spring Integration channels called MessageChannelPartitionHandler and StepExecutionRequestHandler.

Figure 13.14 shows which mechanisms and types are involved when implementing remote partitioning with Spring Integration support from Spring Batch.

Figure 13.14. Partitioning using Spring Integration: the master and slaves communicate using channels, a messaging gateway, and a listener.

Master and slave machines communicate using Spring Integration channels. As with remote chunking, we apply the master-slave pattern. On the master side, we use the MessageChannelPartitionHandler class to send the partitioned data to slaves for processing for a particular step. We specify the step name in the configuration. The following listing describes how to configure this partition handler and set it on the partition.

Listing 13.17. Configuring a master for remote partitioning

You configure the remote partition handler using the MessageChannelPartitionHandler class . This partition handler uses the messagingOperations property so that the Spring Integration messaging client can execute requests on channels. The replyChannel property is set to the channel to listen to replies. The stepName property is set to the step to execute on the slave. Finally, the gridSize property tells the underlying StepExecutionSplitter implementation how many StepExecution instances to create. In the bean that defines the job, the step importProductsStep-master refers to the partition handler .

As for remote chunking, a listener triggers processing on the slave. The slave listener then delegates to a StepExecutionRequestHandler bean to process the received partition data. To determine which step to execute, you configure a step locator bean of type BeanFactoryStepLocator. The following listing describes how to configure a slave for remote partitioning.

Listing 13.18. Configuring a slave for remote partitioning

The entry point for the slave is a Spring Integration service activator that uses input and output channels to communicate with the remote partitioning step. This service activator references the request handler for processing. You configure this handler as a StepExecutionRequestHandler to find and execute the target step. A step locator is in charge of finding this step. You use the BeanFactoryStepLocator class , which looks for the step in the current Spring context.

Partitioning is flexible because of the partition handler, which implements a local or remote strategy to process partitioned data. Partitioning a step also makes it possible to customize splitting data using custom implementations of the StepExecutionSplitter and Partitioner interfaces.

Customizing Data Partitioning

The two interfaces involved in custom partitioning are StepExecutionSplitter and Partitioner. A StepExecutionSplitter is responsible for creating input execution contexts for a partitioned step execution in its split method. The following snippet lists the StepExecutionSplitter interface:

public interface StepExecutionSplitter {
  String getStepName();
  Set<StepExecution> split(StepExecution stepExecution, int gridSize)
   throws JobExecutionException;
}

The default implementation of the StepExecutionSplitter interface, the SimpleStepExecutionSplitter class, delegates to a partitioner to generate ExecutionContext instances. For this reason, developers don’t commonly implement custom classes; instead, customizations take place at the Partitioner level.

Spring Batch uses a Partitioner at the end of the partitioning process chain. A Partitioner implementation provides a strategy to partition data. The partition method uses the given grid size to create a set of unique input values, such as a set of non-overlapping primary key ranges or a set of unique filenames. The following snippet lists the Partitioner interface:

public interface Partitioner {
  Map<String, ExecutionContext> partition(int gridSize);
}

Let’s now implement a custom strategy to partition data from a database table. You first determine data ranges and then assign them to step executions. Assume here that data is distributed uniformly in the table. Figure 13.15 summarizes the use case.

Figure 13.15. Partitioning based on database column values

You implement a custom partitioner called ColumnRangePartitioner that determines the minimum and maximum values for the column of a table. The partitioner uses these values to define ranges based on the grid size specified. The following listing describes the implementation of the ColumnRangePartitioner class.

Listing 13.19. Custom ColumnRangePartitioner class

The partition method assumes that the column is of integer type and queries the minimum and maximum values from the database . The method then creates as many execution contexts as specified by the targetSize count and adds them to the partition Map . The method also sets the minValue and maxValue properties in the context to identify the range for the current context.

We’ve seen throughout this chapter that Spring Batch provides several scaling patterns to improve performance. The challenge is in choosing the right pattern for a given use case. In the next section, we compare the pattern features and provide guidelines for choosing a pattern (or combination patterns) for different use cases.

13.6. Comparing patterns

In choosing the best pattern for a use case, you need to consider your batch jobs, overall application, and the whole information system. In this section, we provide guidelines for choosing scaling patterns.

Table 13.4 summarizes the Spring Batch approaches used to implement scaling. These patterns leverage multithreading, remoting, and partitioning to improve performance.

Table 13.4. Spring Batch scaling approaches

Pattern

Type

Description

Multithreaded step Local A set of threads handles a step. All resources involved must be thread-safe. Carefully consider concurrency issues.
Parallel step Local Execute steps in parallel using multithreading. Parallelize steps in a step flow using multithreading. Because parallel steps must be strictly independent, this approach has no concurrency issues.
Remote chunking Remote Execute chunks remotely. A master sends chunks to remote slaves for processing. Useful if reading on the master isn’t a bottleneck.
Partitioning step Local and remote Define data sets for parallel processing. Control parallel data set processing. The master mustn’t have a bottleneck when using remoting.

The first piece of advice we can give you about choosing a scaling techniques is, don’t do it! Implement your jobs traditionally and use the techniques in this chapter only if you face performance issues. Keep it simple! Then, if your jobs take too long to execute, you can first consider using local scaling with multithreading if your hardware supports it. This is relevant if you have multicore or multiprocessor hardware. For multithreaded steps, you must be extremely cautious, think about thread-safety, and batch job state. Most of the classes involved in steps, like the built-in readers and writers, aren’t thread-safe, so you shouldn’t use them in a multithreaded environment. You can also parallelize processing with multithreading using parallel steps and partitioning steps. Parallel steps require organized processing in steps, which is generally a good approach. With partitioning steps, you can leverage built-in classes to select sets of data to process in parallel. For example, using one thread per file to import data is particularly convenient and efficient.

In a second round, if performance still doesn’t suit you, you can consider remoting and splitting batch processing on several machines. On one hand, be aware that remote scaling introduces complexity in your batch jobs; use it only if you must. On the other hand, these techniques provide high levels of scalability.

You can use two different Spring Batch techniques in this context: remote chunking and partitioning. Remote chunking systematically sends chunks to slaves for remote processing, whereas partitioning creates data sets to send to slaves for remote processing. Table 13.5 lists the pros and cons of both patterns.

Table 13.5. Comparing Spring Batch remote scaling patterns

Approach

Pros

Cons

Remote chunking No need to know about the input data structure Not sensitive to timeout values Transactional middleware required to handle failures Potential bottleneck in reader for data serialization
Partitioning step Transactional middleware not required to handle failures No bottleneck in reader for data serialization Low bandwidth and transport costs Need to know the input data structure Can be sensitive to timeout values

For the remoting patterns, Spring Batch doesn’t provide implementations in its core distribution. You can consider using the Spring Batch Integration module in Spring Batch Admin to use Spring Integration–based remoting.

As you can see, Spring Batch provides a large solution set to implement batch process scaling. The biggest challenge is in choosing the right patterns to improve performance.

13.7. Summary

Scaling in Spring Batch provides various solutions to enhance batch job performance with minimum impact on existing job implementations. Spring Batch configuration files mostly implement scaling and can involve multithreading, parallel executions, partitioning, and remoting.

Spring Batch implements nonsequential processing with multithreading. One approach is to use multithreaded steps, but you need to be cautious because this requires thread-safe code, and you must add specific state management code to keep batch jobs restartable. Spring Batch also provides the ability to execute steps in parallel. This requires proper organization of a batch process in steps. The configuration defines the execution sequence.

Spring Batch provides advanced and highly scalable patterns and frameworks. Remote chunking splits chunk processing on several computers to balance the processing load. Partitioning provides a way to split up data for remote or multithreaded processing.

Scaling can be challenging to implement and is strongly linked to batch processing and the execution environment. Spring Batch implements a set of patterns that you can use and combine to improve performance.

Chapter 14 covers an essential aspect of application development: testing. This is particularly true for batch jobs because they mainly process data without user interaction and apply complex business rules. Unit and functional testing gives us the confidence to maintain and grow applications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.237.178.126