9781484207949_Fig08-11.jpg

Figure 8-11. SI graph for chain example

Splitter

A splitter is a middle component (can participate in unidirectional or bidirectional flows) that can operate in passive or active mode. It has a unique function in the SI framework, splitting the received message into various messages based on these mechanisms:

  • Custom Java logic: Generic component, which can be configured via the <int:splitter> XML element or @Splitter annotation
  • XPath: XML payload specific, which can be configured via <int-xml:xpath-splitter>

Each message produced by the splitter will be filled with these headers:

  • correlationId: By default, messages created from a single message will have the same correlationId.
  • sequenceSize: By default, the number of messages created from the original.
  • sequenceNumber: By default, the order of the message created from the original.

These headers are explained in detail in the upcoming “Aggregator” section. An aggregator is commonly used with a splitter in message flows.

Splitter XML Example

Listing 8-91 shows the output service from the SI flow.

The writeAndIndicateSuccess() method is used to consume messages in the SI flow. Into the first parameter we inject the String payload, and into the second parameter we inject the Map headers of the message. Messages are injected in order to log the default headers created by the splitter. The payload is passed to WriteRepository to persist it. At the end of the method, we return a false value if the message payload equals messageFail. This is useful to show which forked messages will be used to construct the reply message. Listing 8-92 shows the SI flow configuration.

The input part of the flow is covered by the inbound gateway with our common SiWrapperService interface and connects it to the splitter via inChannel. The splitter uses the delimiters attribute as splitting logic, which means the messages will be split based on a semicolon in this case. The splitter’s output-channel continues into the service activator mapped to WriteService.writeAndIndicateSuccess() from Listing 8-91. Listing 8-93 shows the main class of this example.

After we run the Spring context and retrieve the service interface from the context, we send two messages to the SI infrastructure. The first one has two semicolons, and messageFail is in the middle; the second one has only one semicolon, and messageFail is at the end of the string message. So in the first case, we expect the message to be split into three messages, and the second one into two. Let’s show the output of this example in Listing 8-94.

We can observe that the first three messages have the same correlationId, sequenceSize=3, and are split as we expect with sequenceNumber in the order that we expect. The same is true for the second group of messages. An interesting fact is that the return value from the service activator is sent back via a temporary reply channel only for the last message from each group. This is because there is only one temporary reply channel for all split messages, and obviously there can be only one return value from the service interface SiWrapperService. Other reply messages are discarded. Figure 8-12 shows the SI graph for this flow.

9781484207949_Fig08-12.jpg

Figure 8-12. SI graph for splitter example

Splitter Example with Java Configuration

This example will use similar constructs to those in the previous example, 0825-splitter-delimiters-xml. To avoid presenting the same long code listing, we’ll list the similar classes:

  • WriteServiceAnnotated: A Java implementation, like WriteService in Listing 8-91, with only one difference: WriteServiceAnnotated.writeAndIndicateSuccess() is annotated with @ServiceActivator and connected to splitChannel.
  • SiWrapperServiceAnnoated: The same interface signature as the common SiWrapperService, but annotated with @Gateway and @MessageEndpoint.
  • SiApplication: The same as the SiApplication class in Listing 8-93. The only difference is the missing import of the si-config.xml file, as this is a Java-configured example.

Listing 8-95 shows the SI configuration.

This configuration class with an SI component scan registers two channels: inChannel and splitChannel. Listing 8-96 finally shows an annotated example of the splitter.

The class needs to be annotated by @MessageEndpoint to mark it as a bean for SI endpoints. @Splitter annotates the splitting method with two mandatory parameters: inputChannel and outputChannel. The splitting method needs to return the List<String> of the object, which will be represented as payloads for split messages. The output is also the same as for the 0825-splitter-delimiters-xml example in Listing 8-94.

Aggregator

An aggregator is a passive middle component, so it can be integrated into bidirectional and unidirectional SI flows. As we already mentioned, an aggregator’s function is the opposite of a splitter’s. An aggregator aggregates various messages into one message and can be configured via the XML element <int:aggregator> or via the annotation @Aggregator. An aggregation is performed based on these two mechanisms:

  • CorrelationStrategy: A mechanism used to group incoming messages, so that they can be aggregated together. A custom CorrelationStrategy can be configured via the correlation-strategy-method or correlation-strategy-expression XML attributes or the @CorrelationStrategy annotation with a Java configuration.
  • ReleaseStrategy: A mechanism to decide when the group of input messages is complete, so that they can be aggregated. The custom ReleaseStrategy can be configured via the release-strategy-method or release-strategy-expression attributes or the @ReleaseStrategy annotation with a Java configuration.

By default, aggregation is performed based on these message headers:

  • correlationId: Tells the aggregator that messages with the same correlationId should be merged (CorrelationStrategy based on correlationId).
  • sequenceSize: Tells the aggregator how big the group of messages is to aggregate.
  • sequenceNumber: Tells the aggregator the order of the current message within the group to aggregate.

So, by default, the aggregator’s ReleaseStrategy works based on gathering messages, and then inspecting their sequenceSize, sequenceNumber, and correlationId. Messages are released when we have a full group of correlated messages.

We can also configure a time-out for the aggregator component, whereby a group of messages will be released/aggregated after a defined time. In this case, we have to specify whether we want to aggregate a partial message or discard a partial group of messages.

The aggregator obviously must first temporarily store messages before it can aggregate them. Therefore, it is a much more complex component than a splitter, as it implements internal storage for partial message groups.

Custom Aggregator Example with XML Configuration

Listing 8-97 shows the service interface used by the inbound gateway on entrance to the SI flow.

Notice that the return value is wrapped into Future<T>. It represents the result of an asynchronous operation that will be filled in the future when the operation finishes. We will use it to highlight how an aggregator in a bidirectional flow can end up blocking the caller. Listing 8-98 shows the SI flow’s XML configuration.

The inbound gateway uses the SiWrapperServiceFuture service interface and is connected to the aggregator via inChannel. The aggregator needs to specify the input-channel and output-channel attributes. In this case, we also specify the custom correlation and release strategy via inline SpEL expressions. Correlation of messages is based on the eighth character in the message. So if this character matches, messages should be aggregated together. The release strategy is based on the message group size. As soon as we have two correlated messages, we release them. By default, string messages are aggregated by concatenation with a comma.

This aggregator is connected to the service activator, which invokes our common WriteService.writeAndIndicateSuccess() method. Listing 8-99 shows the asynchronous sender of the messages.

This Spring bean injects SiWrapperServiceFuture, which is the service interface used in the gateway, and encapsulates SI flow. The sendMessage() method is annotated with @Async, so it will be executed in a separate thread when called. Notice that we use customExecutor as an annotation attribute, which specifies the name of the executor bean used to execute this asynchronous logic.

In this method, we call the service interface to pass the message to the SI flow. The returned resultFuture is then used to wait for the resulting Boolean value that will eventually be returned. To be make sure it won’t stay hanging in the resultFuture.get() call forever, we configure a 1-second time-out for this operation. Finally, we log the returned result. Listing 8-100 shows the main class of this example.

This Spring Boot main class enables asynchronous Spring features alongside XML SI flow configuration. This is also a Spring configuration class, so we take advantage of that fact and register the customExecutor thread pool with pool size 10, to have enough threads available for our example.

After we start the Spring Boot application, we retrieve AsyncMessageSender from the Spring context and send four messages to the SI flow via this asynchronous sender. Notice the order of messages. Bear in mind that our correlation logic should correlate message1 with message1, and message2 with message2. When we run this example, we get the output in Listing 8-101.

There are some problems, so let’s explain. When we send four messages into the asynchronous sender, the main thread is not blocked. The four new threads are kicked off (with the names customExecutor-1 .. customExecutor-4), executing logic in AsyncMessageSender.sendMessage() asynchronously.

So we send four messages in four separate concurrent threads into the aggregator component. These messages should be aggregated into two messages, which happens as expected in the first and third log entries. Notice the thread names for these log entries. Log entry Text persisted: message1,message1 is logged from AsyncTaskExecutor-3, and Text persisted: message2,message2 is logged from AsyncTaskExecutor-4. So as you can see, the aggregator uses a separate thread pool to aggregate the messages together internally.

Now let’s focus on responses from the aggregator. The gateway component creates four temporary reply channels and expects four replies. Two of the temporary reply channels get a successful response (in threads customExecutor-2 and customExecutor-4)—one for each aggregated message. But the remaining two temporary reply channels (in threads customExecutor-1 and customExecutor-3) timed out in the AsyncMessageSender.sendMessage() logic. So if we didn’t set up a time-out for resultFuture.get(), the thread would be hanging forever, waiting for a response from the aggregator, which obviously wouldn’t come back.

This highlights how tricky it can be to configure bidirectional flow with an aggregator. Therefore, it’s much better suited for unidirectional flows, or in conjunction with a splitter.

Figure 8-13 shows the SI graph for this example.

9781484207949_Fig08-13.jpg

Figure 8-13. Visualization of SI configuration for XML aggregator example

Custom Aggregator Example with Java Configuration

This example borrows some classes from the previous example, 0827-aggregator-xml:

Listing 8-102 shows the Java configuration of the aggregator component.

The class is annotated with @MessageEndpoint to mark it as an SI component for the integration component scan. The first method, aggregate(), joins messages. We used the Java 8 Stream API for this purpose. It converts the message collection into the Java 8 stream and uses the reduce() feature to concatenate two elements of the stream into one. Finally, it again converts the stream into the collection via the get() method. The @Aggregator annotation requires the inputChannel and outputChannel attributes to be defined.

The second method, releaseChecker(), defines a custom release strategy for the aggregator. When the group reaches two messages, we return true to inform SI about the fulfilled release condition. The last method, correlateBy(), defines the correlation strategy for this aggregator. If the returned value is the same, the messages belong together and should be aggregated into one group.

The rest of the SI flow is defined via the XML configuration in Listing 8-103—partially to highlight how to combine XML and Java configurations for an SI application, and partially to decrease the number of listings needed for this example.

The element <int:annotation-config> is the XML equivalent of the @IntegrationComponentScan annotation, so it performs a scan for the @MessageEndpoint components. Better placement of this configuration would be on the main Spring configuration class, but we want to highlight this XML configuration feature of the SI framework. The inbound gateway, service activators, and two channels are constructs we’ve already seen in previous examples.

When we run this example via the main SiApplication class, we get similar output similar to that of the 0827-aggregator-xml example in Listing 8-101.

Aggregator with Default Headers Example

This example is configured purely with Java. Listing 8-104 shows the inbound gateway.

This bidirectional gateway returns Future<Boolean> and accepts Message<String> connected to inChannel. Listing 8-105 shows the aggregator connected to inChannel on the other side.

In this case, we don’t specify a custom correlation or release strategy. So the defaults are used, based on the correlationId, sequenceSize, and sequenceNumber headers. Listing 8-106 shows the configuration of the service activator.

The class uses the common WriteRepository to persist the received messages. The @ServiceActivator annotation connects this component to the output of the aggregator. Listing 8-107 shows the asynchronous logic used for sending messages.

We inject SiWrapperServiceFutureAnnotated, which can consume Message<String> message types, and return a Future<Boolean> result. The asynchronous method sendMessage() constructs messages based on the given stringMessage payload and correlationId parameters using MessageBuilder. The sequenceSize header is hard-coded to value 2, so the aggregator will aggregate the two messages into one. One more header is needed for aggregation: sequenceNumber, which defines the order of messages in the sequence so that aggregator can release all messages from the sequence when they arrive. This header is injected into messages by default.

The rest of the logic should be familiar. We send messages to the SI flow and read the Future<Boolean> result. Subsequently, we wait for the SI flow to respond or time out. Listing 8-108 shows the main class of this example.

This performs the SI scan, enables asynchronous processing, configures customExecutor, and starts the Spring Boot application. Notice that we didn’t register the channels inChannel and aggregatedChannel anywhere. SI can sometimes figure out wiring without explicit definition of channels. It creates DirectChannels under the hood in such a case (as discussed in the following “Message Channel” section).

After everything is configured, we retrieve the AsyncMessageSender bean and send four messages with correlation IDs. The output after running this class is similar to that of the 0827-aggregator-xml example in Listing 8-101.

Message Channel

The message channel is the last category of SI components (alongside the message and message endpoint). It is used to connect message endpoints. A message channel is a simple Spring bean, so no additional infrastructure is needed to fulfill its task. But if needed, messages temporarily stored in a channel can be persisted with JMS or JDBC.

To understand various channel types and their characteristics, it is good to review their inheritance structure, shown in Figure 8-14.

9781484207949_Fig08-14.jpg

Figure 8-14. Message channel inheritance structure

Notice that the interfaces (MessageChannel, PollableChannel, SubscribableMessageChannel) are part of the org.springframework.messaging package, which is located in the spring-messaging module of the Spring Core framework. All the implementations are located in the SI framework itself (the package org.springframework.integration.channel).

Two types of channels are possible in terms of the number of consumers:

  • Point-to-point channel: Each message is consumed by exactly one endpoint. Can be configured via the XML element <int:channel> or registered as a Java bean. All channel implementations except PublishSubscribeChannel belong here.
  • Publish/subscribe channel: Each message is sent to all subscribed consumers. Can be configured via the XML element <int:publish-subscribe-channel> or by registering PublishSubscribeChannel as a Spring bean.

From a message hand-off point of view, a message channel can participate in the following:

Synchronous message passing

  • The consumer is triggered immediately by using a caller thread.
  • If the SI flow contains only synchronous passive components, the caller is blocked until the downstream component replies to the reply channel.
  • The channel can propagate transactions or exceptions between the caller and consumer.
  • The consumer can use the security context of the caller.
  • Implementations: DirectChannel, synchronous PublishSubscribeChannel.

Asynchronous message passing

  • Consumption is done in a different thread from that of the sending of the message.
  • The consumer can’t use the transaction or security context from the sender.
  • The consumer can’t propagate exceptions to the sender.
  • Implementations: QueueChannel, PriorityChannel, ExecutorChannel, RendezvousChannel, asynchronous PublishSubscribeChannel.

DirectChannel

DirectChannel is the simplest implementation of a channel. It is a synchronous, point-to-point channel. The consumer needs to subscribe to it, so it will be triggered immediately after the sender sends the message. Because it is synchronous, the caller and sender share the same thread as if their interaction were done via a simple Java call.

The showcase of this channel will refer to previous examples as all of them were using this channel implementation. We can configure it via the following:

  • The XML element <int:channel> which doesn’t use <int:queue>, <int:priority-queue>, or <int:dispatcher> subelements. An example of this configuration is inChannel or aggregatedChannel in the 0828-aggregator-javaconfig example project, seen in Listing 8-103.
  • Use one of the constructors to create an instance of DirectChannel and register it as a Spring bean via the @Bean annotation. The name of this bean then can be used as the channel name in the message endpoint configurations. A Java configuration example can be seen in Listing 8-95 as inChannel or splitChannel in example project 0826-splitter-javaconfig.

QueueChannel

QueueChannel is the second most used channel implementation. It is an asynchronous, point-to-point channel, whereby the consumer needs to actively poll against it to verify whether the new message arrived. This message passing is done in a separate thread and can cause delays in communication. The consumer component needs to be active (needs to configure the poller).

As it’s asynchronous, the transaction and security contexts are not populated from the sender to the consumer. Additionally, the consumer exceptions are not propagated back to the sender.

Because the consumer is polling for messages, the channel obviously needs to be able to temporarily store messages. Internally, it uses the standard Java BlockingQueue to store them. So it uses first-in/first-out (FIFO) ordering of messages. When we configure QueueChannel, you have the option to configure the queue capacity. If the capacity of the messages is reached, the sender will be blocked until room is available or the time-out is reached. By default, this queue has the capacity Integer.MAX_VALUE.

To configure QueueChannel, you can do the following:

  • Use the XML element <int:channel> with the <int:queue> subelement. The capacity can be defined via the capacity attribute of <int:queue>. An example of this configuration is inChannel in the 0821-bridge-xml example project, seen in Listing 8-81.
  • Use one of constructors to create an instance of QueueChannel and register it as a Spring bean via the @Bean annotation. The queue capacity can be passed to the constructor as a parameter. The name of this bean can then be used as the channel name in the message endpoint configurations. A Java configuration example can be seen in Listing 8-89 as inChannel in example project 0823-bridge-to-javaconfig.

PriorityChannel

PriorityChannel is the child class of QueueChannel. Therefore, they are similar and share most characteristics. The only difference is that PriorityChannel does not use FIFO for message ordering in the underlying queue storage. With PriorityChannel, we are able to specify the order in which messages will be consumed from the channel. This order can be configured in two ways:

  • Based on the message header priority. Messages with a higher-priority value will be consumed first. This mechanism is used by default when comparator is not configured for PriorityChannel.
  • Based on the Comparator<Message<T>> implementation, using custom logic for message comparison.

PriorityChannel can be configured via the following:

  • The XML element <int:channel> with the <int:priority-queue> subelement.
  • Use one of constructors to create an instance of PriorityChannel and register it as a Spring bean via the @Bean annotation. The comparator or queue capacity can be configured as constructor parameters. The name of this bean can then be used as the channel name in the message endpoint configurations.

PriorityChannel Example

This example implements our own comparator to order messages in PriorityChannel. It is an XML-based configuration. Listing 8-109 shows the comparator implementation.

It’s worth remembering the Comparator<T>.compare() contract here. The comparator will decide that o1 in comparison to o2 is less than/equal to/greater than based on the returned value negative integer/zero/positive integer. Lesser objects will be consumed first from the channel.

In this case, we use reverse natural ordering of the String message payload. Natural ordering is in String’s case alphabetical. So if our messages should be consumed from the channel in reverse alphabetical order, we use this comparator. Listing 8-110 shows the XML SI configuration.

On the input side of the SI flow, we use the inbound gateway mapped to our common service interface SiWrapperServiceVoid and connected to inChannel. This in channel is configured as the priority channel with capacity 10 and SimpleMessageComparator bean as comparator.

The outbound channel adapter (connected to inChannel) is configured as an active component with a polling interval of 100 ms. It will also read only one message per poll cycle (attribute max-messages-per-poll) and will time out after 1 ms when reading from inChannel. The latter configuration is needed to highlight the changed ordering of the priority channel. If we didn’t configure such a short interval, the outbound channel adapter would eagerly wait to consume the first message that appears in the channel queue, and we wouldn’t have a chance to gather at least two messages.

Listing 8-111 shows the main class of this example.

After running the application with the XML SI configuration we described, we read the service interface bean SiWrapperServiceVoid from the Spring context to send messages. We also log the information that the message was sent to the SI flow already. Listing 8-112 shows the output when we run this main class.

Information about sending both messages is printed first. This means that the caller (main thread) isn’t blocked with the wrapperService.processText() call. When both messages are gathered in the priority channel, the output channel adapter reads them in reverse alphabetical order, as we configured.

RendezvousChannel

RendezvousChannel is also a child class of QueueChannel, but it uses SynchronousQueue as the queue implementation (it’s BlockingQueue configured with zero capacity). So it is able to perform synchronously (via direct handoff) but at the same time act as an active component (consumer is polling for messages). The sender of the message is blocked until the active consumer reads the message from RendezvousChannel.

RendezvousChannel can be configured via the following:

  • The XML element <int:channel> with the <int:rendezvous-queue/> subelement.
  • Use one of the constructors to create an instance of RendezvousChannel and register it as a Spring bean via the @Bean annotation. The name of this bean can be then used as the channel name in the message endpoint configuration.

RendezvousChannel Example

Listing 8-113 shows the XML SI configuration.

The inChannel channel is configured as RendezvousChannel and is connected to the inbound gateway and outbound channel adapter. In this case, the outbound channel adapter polls at 1-second intervals and reads one message at the per poll cycle. This polling was chosen to highlight RendezvousChannel features in the output from Listing 8-114. The main class is the same as in the 0830-channel-priority-xml example in Listing 8-111.

Notice the timing on these log entries. The handoff of message1 happens nearly immediately as it’s sent to the SI flow. This is because the outbound channel adapter is already waiting for the message to arrive to inChannel. But the second handoff of message2 happens approximately 1 second after the handoff of message1. This is because the main thread is blocked in the wrapperService.processText(MESSAGE2) call until the channel adapter poller triggers reading of message2.

ExecutorChannel

ExecutorChannel is a subscribable channel, so the consumer is a passive component waiting to be triggered by the received message. But as opposed to DirectChannel, the message is passed from the sender to the consumer in a separate thread. The dispatch of the message is delegated to the TaskExecutor instance configured for ExecutorChannel.

It can be configured via the following:

  • The XML element <int:channel> with the <int:dispatcher task-executor="..."/> subelement.
  • Use one of constructors to create an instance of ExecutorChannel and register it as a Spring bean via the @Bean annotation. The name of this bean can then be used as the channel name in the message endpoint configurations.

ExecutorChannel Example

Listing 8-115 shows the SI configuration.

The inChannel channel is configured as ExecutorChannel, using the TaskExecutor bean with the name executor. This TaskExecutor bean is also created in this configuration file by the XML element <task:executor>. inChannel is connected to the inbound gateway with our common service interface SiWrapperServiceVoid. The consumer of inChannel is in this case the passive outbound channel adapter mapped to the common WriteService.write().

We again use the same main class as for 0830-channel-priority-xml from Listing 8-111. When we run this main class, we can observe the output in Listing 8-116.

Notice that the handoff occurs immediately and in separate threads (using the executor thread pool) without blocking the caller (main) thread.

PublishSubscribeChannel

PublishSubscribeChannel uses the publish/subscribe model of messaging, whereby all the subscribers will receive message sent to it. So our message can have multiple consumers.

PublishSubscribeChannel can be configured via the following:

  • The XML element <int:publish-subscribe-channel>.
  • Use one of the constructors to create an instance of PublishSubscribeChannel and register it as a Spring bean via the @Bean annotation. The name of this bean can then be used as the channel name in the message endpoint configurations.

We can configure PublishSubscribeChannel to handle message passing as follows:

  • Synchronously
    • Default configuration
    • Processing of all consumers is done sequentially in the sender’s thread, so each consumer needs to wait for other consumers.
    • Can propagate transaction, security context, and exceptions between consumers and sender.
  • Asynchronously
    • Can be configured with the task-executor attribute of <int:publish-subscribe-channel>, whereby we specify the reference to the TaskExecutor instance.
    • Each consumer processing can be done in a separate thread and thus they occur concurrently.
    • Transaction propagation, passing the security context, and bubbling exceptions between consumers and sender doesn’t work in this mode.

PublishSubscribeChannel example

Listing 8-117 shows the SI configuration.

We use in this case the synchronous PublishSubscribeChannel with the name inChannel, consuming messages from the inbound gateway with the common service interface SiWrapperServiceVoid. Two outbound channel adapters are configured to listen to inChannel, delegating messages to our common WriteService.write().

This example reuses the same main class as for 0830-channel-priority-xml in Listing 8-111. When we run it, we get the output in Listing 8-118.

This whole communication occurs sequentially in a single main thread. Each message is persisted twice, because two consumers are configured for each message (two output channel adapters connected to inChannel).

NullChannel

NullChannel is a special type of channel used for discarding messages. SI creates this channel by default, so we can use it by specifying nullChannel as the channel name.

Channel Interceptor

As its name suggests, a channel interceptor is used for intercepting channels. We can implement the interface org.springframework.messaging.support.ChannelInterceptor and register it as a Spring bean and use this custom logic when on these interception points: preSend, postSend, afterSendCompletion, preReceive, postReceive, afterReceiveCompletion.

If the intercepted channel is AbstractSubscribableChannel, only the preSend, postSend, and afterSendCompletion interception points are relevant, because the consumer is triggered by the channel and is not executing the receive action. If the intercepted channel is AbstractPollableChannel, all interception points are relevant, because it has the send action as well as receive available.

The interception points afterSendCompletion and afterReceiveCompletion are executed only after the related action (send/receive) is successful.

A channel interceptor can be configured as follows:

  • Global interceptor: The top-level XML element <int:channel-interceptor>, where we can specify the pattern attribute for channel names we want to intercept. The bean implementing the org.springframework.messaging.support.ChannelInterceptor interface can be configured via the ref attribute or via the <bean> subelement.
  • Local interceptor: The channel XML subelement <int:interceptors>, where we can specify the list of interceptors for a particular channel. The bean implementing the ChannelInterceptor interface can be configured via the <ref> subelement of the <int:interceptors> tag.

Channel Interceptor Example

Listing 8-119 shows a simple implementation of ChannelInterceptor.

On each interception point that we need to implement as a contract for the ChannelInterceptor interface, we log information about hitting it. Notice that the preSend and postReceive intercept points have an option to return the message instance as a return value, and thus replace the message if needed. Also, the preReceive method returns a Boolean value, which may stop the receive action if false is returned. Listing 8-120 shows the SI configuration.

The configuration of the input and output component is common in our examples. We use the QueueChannel implementation, whereby the outbound channel adapter polls against inChannel. The channel interceptor configuration is interesting; the SimpleInterceptor bean is configured as the interceptor for all channels whose name starts with the in prefix.

The main class is again reused from 0830-channel-priority-xml (Listing 8-111). When we run it, we get the output in Listing 8-121.

We include log entries for only one message. The second message would have similar log entries. All the interception points were executed because the consumer needs to poll (explicitly execute AbstractPollableChannel.receive()) against inChannel.

Wire Tap

A wire tap is one of the original enterprise integration patterns and, in fact, is a special type of interceptor. It sends a message to another channel without affecting the original flow. This behavior is especially useful for monitoring and debugging.

It can be configured as follows:

  • Global wire tap: Can be configured as a top-level XML element <int:wire-tap>. It can specify a pattern for channel names to intercept.
  • Local wire tap: Can be configured as part of the <int:interceptors> list for the channel and uses the <int:wire-tap> XML subelement.

Wire Tap Example

Listing 8-122 shows the SI configuration.

The inChannel channel is configured with the interceptors list, and only the wire tap is configured. This wire tap sends messages to logChannel, which is connected to the logging channel adapter. It will log the message payload if the correct logging level is configured (in this case, at least INFO).

The main message flow is untouched, so the message is received by the inbound gateway with the service interface SiWrapperServiceVoid and handed to the outbound channel adapter mapped to WriteService.write().

We again borrow the main class from Listing 8-111 of the 0830-channel-priority-xml example project. When we run it, we get the output in Listing 8-123.

As we expect, the main flow happens correctly. In addition, we have the message payload logged by the SI internal LoggingHandler.

Error Handling

Reliable error handling is an important part of each production-ready application. An SI application is no exception. Therefore, SI provides various mechanisms to simplify error-handling implementations. We can divide these mechanisms as follows:

  • Synchronous error handling: When an error happens during synchronous message passing, it is wrapped into org.springframework.messaging.MessageHandlingException and propagated back to the sender of the message.
  • Asynchronous error handling
    • Unidirectional flow: In the case of an asynchronous handoff, the exception can’t be propagated to the sender. So SI in this case creates org.springframework.messaging.support.ErrorMessage from the exception and sends this message to a suitable errorChannel.
    • Bidirectional flow: The error is propagated to the sender of the messages even if we are dealing with an asynchronous message handoff.

We mentioned that errorChanel is used to send ErrorMessage in case an exception is thrown in a unidirectional asynchronous message flow. But SI enables us to configure errorChannel in various ways:

  • Message-specific error channel: If a message has a specified header with the name errorChannel, errors caused by this message will be sent to the channel specified by this header.
  • Component-specific error channel: If the component that initiated the asynchronous message dispatch has the specified error-channel, the error message that occurs in its thread will be sent to the channel specified by this attribute (notice that it has to be an asynchronous dispatch, because a synchronous dispatch represents a synchronous message passing where the error is propagated to the sender).
  • Global error channel: If none of the preceding mechanisms is matched, the global error channel will be used to handle error messages. SI creates by default the publish/subscribe errorChannel, which sends error messages to the logging component. So, by default, all error messages are at least logged by the logging library used in the project. If needed, we can override this default global error channel by explicit definition of the channel with the name errorChannel.

It’s also worth mentioning that SI provides a special type of router only for routing ErrorMessage messages based on the type of Exception raised. It can be configured via the <int:exception-type-router> XML element.

Custom Class in Error-Handling Examples

Before we jump to error-handling examples , let’s introduce the class in Listing 8-124, which will be used for error simulation.

This Spring service bean throws an exception when the write() or writeAndIndicateSuccess() method is executed.

Synchronous Error Propagation Example

Listing 8-125 shows the SI configuration for this example.

The inbound gateway is mapped to our common service interface SiWrapperServiceVoid, so we are dealing with a unidirectional flow. The message is sent from the inbound gateway into the outbound channel adapter mapped to the WriteServiceWithError.write() method. Listing 8-126 shows the main class of this example.

This Spring Boot application includes the si-config.xml SI flow and starts the Spring context. After that, we retrieve the service interface from the context and send the message. The sending logic is in the try-catch block, because we are expecting IllegalStateException here. Listing 8-127 shows the output of this example after running the main class.

An error is thrown in WriteServiceWithError.write() and bubbles up to SiApplication.main() via a lot of SI and Spring calls (represented by three dots in the listing). So this covers the synchronous message-passing scenario, whereby an error is propagated back to the sender of the message.

Asynchronous Bidirectional Flow

Listing 8-128 shows the SI configuration .

The inbound gateway uses the SiWrapperService service interface, which indicates a bidirectional flow. inChannel is QueueChannel with capacity 10, which is connected to the service activator component mapped to WriteServiceWithError.writeAndIndicateSuccess(). It is an active component polling for messages every 100 ms. Listing 8-129 shows the main class of this example.

This Spring Boot main class uses the bidirectional service interface SiWrapperService bean to send a message into the SI flow. The message is sent in the try-catch block to prove that SI will propagate an exception to the main thread even when it’s thrown in a separate thread. When we run this example, we can observe the output in Listing 8-130.

As we can see, SI with Spring asynchronous support ensures that the error will be propagated into the main thread.

Asynchronous Unidirectional Flow

Let’s now switch to unidirectional flow . Listing 8-131 shows the SI configuration.

This flow is similar to that in the previous example, 0837-error-propagated-async, but in this case we are dealing with unidirectional flow. This is because SiWrapperServiceViod.processText() and WiteServiceWithError.write() don’t return a value.

The main class is reused from the 0836-error-handling-sync example (Listing 8-126). Listing 8-132 shows the output after running this method.

The main thread isn’t affected by the error that happens in the downstream SI flow. But it is caught by SI, converted into ErrorMessage, and sent to the global errorChannel and then to LoggingHandler to be logged.

Global Error Handler Overriding

This example shows how to create a custom handler for errorChannel . We use the same classes and SI flow from the previous example. Listing 8-133 shows our custom global error handler.

This normal service activator is configured with a Java annotation to listen on errorChannel. It logs the error for this example’s simplicity, but we could place any custom logic here. errorChannel by default consumes all the error messages from the SI flow that aren’t handled otherwise. When we run the main class of this example, we get the output in Listing 8-134.

The error is caught by SI and sent to errorChannel, where our ErrorHandler is listening. So the error is logged from this class.

Custom Error Channel Example

The last error-handling example shows how to configure a custom error channel for a particular poller. Listing 8-135 shows our custom handler.

This Java-configured service activator is connected to customErrorChannel, which logs the error. Listing 8-136 shows the SI configuration for this example.

The poller on the service activator is configured with error-channel equal to customErrorChannel, and ErrorHandler is listening. So this error handler is exclusive for this poller in this case, and the global errorChannel uses the default SI error handler. The main class for this example is the same as for the 0836-error-handling-sync example in Listing 8-126. The output is the same as for the previous example, 0839-error-channel-global, in Listing 8-134.

Transaction Handling

Propagation of transactions across the SI flow has slightly different boundaries than propagation of exceptions. As we’ve already shown, exceptions sometimes can be propagated back to the caller even when we are dealing with asynchronous message passing. But this is not true for transaction propagation. Transactions can be propagated in the SI flow only when the entire flow is synchronous. To configure transaction support, we can use standard Spring mechanisms (for example, @Transactional annotation).

But there is an option to have the transaction present for an asynchronous message subflow, whereby the transaction boundary starts in an active component. When we specify <int:poller> for any of the active components, we can define the XML poller subelement <int:transactional>. This element tells SI that we want to start a transaction for every new thread started by poller triggering. This mechanism is called transacted polling.

Transaction Propagation Example

To explore transaction propagation, we are going to persist the message into the in-memory database and simulate the error to roll back the transaction. Listing 8-137 shows the service simulating the mentioned error.

When this Spring service bean is called, it throws IllegalStateException to simulate the error. The method call handleJdbcResult() takes an argument of type Map<String, Object>, because it will be passed from the SI component defined later in this example. Listing 8-138 shows the service interface used in this example.

This service interface is annotated with @Transactional, which wraps this call and all processing behind it into the Spring transaction. Listing 8-139 shows the configuration class that initializes the table for the in-memory database.

This component initializes an in-memory database based on the injected jdbcTemplate bean it autowires. The JdbcTemplate instance is created automatically by Spring Boot. Listing 8-140 shows the SI XML configuration.

This flow starts with the inbound gateway mapped to the transacted service interface SiWrapperServiceTransacted and is connected to the outbound JDBC gateway via inChannel. This gateway uses the dataSource bean to run a query against the in-memory database, which inserts a message within the transaction into TEXT_TABLE.

The reply channel from this component is connected to the service activator, which invokes ServiceWithError.handleJdbcResult(). As we’ve seen, this logic simulates an error. Listing 8-141 shows the main class of this example.

After the Spring Boot application is started with the XML flow we defined, we retrieve the SiWrapperServiceTransacted bean. This is used for sending messages into the SI flow. As we expect an error to be thrown from it, this call is in the try-catch block. In the catch block, we read and log the number of persisted records in the DB. This will help highlight the transaction propagation feature. Listing 8-142 shows the possible output after running this main class.

No record is persisted into the database, because after the error is thrown from SiWrapperServiceTransacted (which is our transaction boundary), Spring rolls back the whole transaction. If we were to remove the @Transactional annotation from SiWrapperServiceTransacted, we would observe the record count equal to 1, despite the error that occurs afterward. This is because without the transaction, the insert from the outbound JDBC gateway would be committed immediately.

Transacted Polling Example

Now let’s explore an example of transactional behavior within asynchronous SI threads. As we mentioned, a transaction can’t pass an asynchronous boundary of SI flow, but we can start the transaction in the thread that handled the asynchronous SI subflow. Listing 8-143 shows the SI configuration.

This SI configuration uses an inbound gateway with our common service interface SiWrapperServiceVoid and is connected to the outbound JDBC gateway via inChannel. But inChannel is of type QueueChannel, which means it needs to be consumed by an active SI endpoint. Therefore, the outbound JDBC gateway uses a poller to consume messages from inChannel every 100 ms in a separate thread. More important, this poller component is defined with the <int:transactional> XML subelement, which defines the transactional boundary for each poller thread.

The reply channel of the outbound JDBC gateway is connected to the service activator via DirectChannel. This endpoint is mapped to ServiceWithError.handleJdbcResult(), which is same as in the example project 0841-transaction-propagation in Listing 8-137. It simulates the error by throwing an IllegalStateException. Listing 8-144 shows the main class of this example.

After we start the Spring Boot application with the SI configuration we described, we send the message to the SI flow via SiWrapperServiceVoid. It is not in the try-catch block in this case, because the error is not propagated to this code anyway. A sleep call is used to wait enough time to make sure that the asynchronous poller inserts the message and simulates the error afterward. After waiting, we look into the database table to see how many records were stored. Listing 8-145 shows the output when we run this main class.

The error is simulated in a separate thread, which is the thread covered in the transactional boundary. Therefore, we observe a zero record count after, in the main thread. If we were to remove <int:transactional> from the SI configuration, the record count would be 1, because the insert wouldn’t be covered by the transaction and rolled back after the error simulation.

MessagingTemplate

Use of the messaging gateway or inbound channel adapters is not necessary to send messages into an SI flow. SI provides the special abstraction org.springframework.integration.core.MessagingTemplate for sending and receiving messages to/from any channel in the SI flow. This mechanism can be more convenient, for example, in testing code or when we want to send a custom message to the SI flow.

We can think of this class as the SI equivalent of JmsTemplate or RestTemplate.

MessagingTemplate Example

This example uses exactly the same SI flow configuration as in Listing 8-27 (example 0808-gateway-generic-xml). To remind you, a messaging gateway is used to send a message via inChannel to the outbound channel adapter mapped to our common bean WriteService.writeAndIndicateSuccess(). Listing 8-146 shows the configuration of MessagingTemplate .

In this Spring configuration class, we register the messageTemplate bean with the receive time-out of 1 second. Listing 8-147 shows how we would use it.

After we create and start the Spring context via the Spring Boot constructs, we retrieve the messagingTemplate bean from it and send simple message. Next we log the result received from inChannel and close the context instance. After running this main class, we can observe the output in Listing 8-148.

The messaging gateway defined in the SI flow isn’t used at all, and we send the message directly to inChannel via messagingTemplate. It is persisted, and the response is sent back to messagingTemplate.

Summary

This chapter introduced the benefits of higher-level integration frameworks and the need to abstract more-complicated message flows into a concise framework such as Spring Integration. We focused on SI’s main concepts and killer features. SI is lightweight, because it doesn’t need any special runtime environment.

We covered the three main SI building blocks (message, message endpoint, and message channel) and covered the various types of each component. We showed how to programmatically create message components if needed and emphasized the immutability of the default message type.

Most notably, we covered a lot of message endpoint types for message routing, transformation, and connecting SI flow to third-party technologies or Java classes and interfaces. These components were explained in passive (synchronous) and active (asynchronous with poller) mode. We also covered which components can participate in unidirectional (fire-and-forget scenarios) and bidirectional (request-response scenarios) flows.

We also looked at message channel types and characteristics. We covered how they behave and why they are useful for different scenarios in relation to active vs. passive message endpoints.

Last, we covered error handling, and error and transaction propagation in relation to synchronous vs. asynchronous message flow.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.188.201