Processing split messages in parallel

When building integrations, it is often necessary to increase the throughput of a route. Short of speeding up the individual steps, which may not always be possible, one of the most convenient ways to do this is to process portions of the route in parallel. This recipe will show how you can use the Splitter EIP's parallel processing option to hand off message fragments to a pool of threads for concurrent processing.

Getting ready

Java code for this recipe is located in the org.camelcookbook.splitjoin.split package. Spring XML files are located under src/main/resources/META-INF/spring and prefixed with splitParallel and splitExecutorService.

How to do it...

To process split messages through the route in parallel, set the parallelProcessing attribute to true on the split statement:

In the XML DSL, this is written as:

<from uri="direct:in"/>
<split parallelProcessing="true">
  <simple>${body}</simple>
  <log message="Processing message[${property.CamelSplitIndex}]"/>
  <to uri="mock:split"/>
</split>

In the Java DSL, the same this is expressed as:

from("direct:in")
  .split(body()).parallelProcessing()
    .log("Processing message[${property.CamelSplitIndex}]")
    .to("mock:split")
  .end()

How it works...

When parallelProcessing is set to true, Camel internally creates a thread pool that is used to service each of the split message fragments through the route, each on a thread from that pool. When parallelProcessing is set to false, all of the split messages are processed on the original thread from the route.

Note

Because of the non-determinism of the threading, it is highly possible that the split message marked as the last fragment (that is a message with the property CamelSplitComplete set to true) may not be the last one actually processed.

There's more...

Because the split messages are handed over to a thread pool, error handling is a little different from the normal case described in the Splitting a message into fragments recipe. As with the regular splitting process, when an exception is thrown, any split message fragments that are in-flight will continue to be processed through the route.

Things change somewhat if you specify that you would like to stop processing by setting the stopOnException attribute to true—the Splitter will be informed not to send any more message fragments to the thread pool. The end result may be that many more fragments are processed in the Splitter after an exception has been thrown. This option should therefore be used with care.

It is possible to include a timeout option with the maximum period in milliseconds that the processing of the full set of fragments should not exceed.

In the XML DSL, this is written as:

<split parallelProcessing="true" timeout="5000">

In the Java DSL, the same thing is expressed as:

.split(body()).parallelProcessing().timeout(5000)

When the timeout is reached, any fragments that have yet to be picked up by the thread pool for processing will be discarded, and fragments currently in flight through the route will be allowed to complete.

It is possible to customize the details of the thread pool used for parallelProcessing by referring to a threadPool instance defined within the Camel context by its id:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <threadPool id="customPool" poolSize="20" threadName="pool"/>

  <route>
    <from uri="direct:in"/>
    <split executorServiceRef="customPool">
      <!-- ... -->
    </split>
  </route>
</camelContext>

Tip

Use of the executorService attribute implies parallelProcessing, so you do not need to define them together. You may, however, still set the parallelProcessing option to true to make the intent of your routing logic explicit.

The Java DSL contains an equivalent option by which you can refer to a thread pool in Camel's registry:

.split(body())
    .executorServiceRef("customPool")
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.135.36