CHAPTER 8

image

Spring Integration

This book covers two of the most fundamental paradigms of an enterprise system: the request-response model (for example, REST, SOAP) and messaging (JMS). The first one can be synchronous (the caller of the service is blocked until the response arrives) or asynchronous (the caller registers a callback to execute or Future to fill in when the response arrives). The second one is, by definition, asynchronous; the sender of the massage doesn’t wait until the consumer processes the message.

These mechanisms are essential for providing communication between distributed applications: integrating them together. But in a modern enterprise, hundreds or thousands of applications need to be integrated. Of course, enterprise architects need to keep them decoupled as much as possible, but in order to provide functionality and value to the customer, such an enterprise system will have some level of coupling. It is easy to imagine how complex a flow diagram of such a complicated distributed system can become over time.

For requirements like these, request-response and messaging integration are too granular as building blocks and abstractions. As bigger and bigger enterprise systems were produced, various architectural patterns started popping up. Gregor Hohpe and Bobby Woolf gathered them into their famous book, Enterprise Integration Patterns (Addison-Wesley Professional, 2003). These patterns provided higher-level abstractions than those in the request-response or messaging model, but often used them as partial building blocks.

Around the same time, David Chappell published Enterprise Service Bus (O’Reilly Media, 2004). Its goal was to simplify integration and communication of services within a service-oriented architecture (SOA). The central concept is a bus (similar to a hardware bus), which should control routing and flow of requests or messages exchanged between services.

After these patterns and principles were codified, a lot of companies and open source developers recognized that these patterns and principles could be implemented into generic frameworks, commonly referred to as Enterprise Service Bus (ESB) frameworks. Nowadays they are sometimes split into categories of lightweight integration frameworks, medium ESB frameworks, and ESB distributed systems.

Major players in this ESB software product space are as follows:

  • Open source
    • Spring Integration framework
    • Apache Camel
    • Mule ESB (Community Edition)
    • Talend ESB
    • Petals ESB
    • Apache ServiceMix
    • WSO2 ESB
    • JBoss ESB
  • Commercial
    • WebSphere Message Broker (IBM)
    • WebSphere ESB (IBM)
    • Oracle ESB
    • Mule ESB (Enterprise Edition)

Spring Integration Introduction

As a book about Enterprise Integration with Spring, it will obviously focus on the Spring Integration (SI) framework. It all started when Mark Fisher read the book Enterprise Integration Patterns and started implementing these patterns. Over the years, it emerged into a project, which is now considered a leader in the ESB framework market (along with Apache Camel, to be fair).

SI is a separate project in the Spring portfolio, but uses Spring Framework concepts such as dependency injection and inversion of control. Therefore, it requires Spring Framework as a dependency.

Spring Integration provides two killer features:

  • Implementation of enterprise integration patterns in the form of in-memory messaging
  • Powerful integration with various technologies and external applications in the form of pluggable adapters

APIs from the same portfolio of projects don’t typically compete with each other, but SI is an exception. Its adapters provide alternative APIs to Spring modules such as Spring JDBC, Spring MVC, Spring JMS, Spring ORM, Spring WS, and various Spring Data and Spring Social projects. This is because SI aims for pluggable implementation of enterprise integration patterns with easy mapping to these external APIs. In fact SI, often uses these modules and projects under the hood in order to avoid reinventing the wheel.

Messages or requests from external systems are converted into in-memory messages (Java POJOs) with use of powerful Spring conversion mechanisms. This ensures extreme interoperability with pretty much all modern (for example, WebSockets) or traditional (for example, FTP) protocols or popular third-party services (for example, Twitter).

This messaging nature ensures that the flow of data is separated into small, composable units that exchange messages between each other synchronously or asynchronously. Such an approach ensures a high level of decoupling, modularity, and testability. Separation of concerns between business logic and integration (often referred to as plumbing) logic is a key goal of SI.

Another major feature is the easy switch between synchronous and asynchronous processing: SI abstractions allow for plugging of asynchronous executors, schedulers, and pollers into the message flow without need for changing processing logic. When using synchronous message flows, we can easily propagate exceptions and transactions across SI components.

Because the Spring framework’s major configuration approach was originally targeted toward XML configuration, SI has followed this habit since its beginning. In the meantime, the Spring framework has shifted its configuration focus to approaches based on Java annotations. This shift also affected the SI framework, and most of the components can now be configured with Java annotations as well as integration flow. But modeling integration flow XML remains the preferred approach because of the following:

  • A lot of adapters for external protocols or technologies can be exclusively provided in XML.
  • ESB flow tends to be hard to follow when spread to various Java classes with annotations.
  • SI flow can be visualized with the help of the Spring Tools Suite (STS) IDE. We’ll explore these diagrams later in the chapter.

Image Note  If you are or want to be a Spring developer and are not familiar with STS, it is time to download it from http://spring.io/tools/sts and try it. It is based on Eclipse, so all the features from this popular IDE are available for STS. In addition to the Eclipse offerings, STS contains a lot of features tailored for Spring development. The quality guarantee of STS is that it is maintained by the Spring team.

Spring Integration’s Main Concepts

A programming model is all about sending, routing, and consuming messages between message endpoints. Spring Integration contains a lot of components, and most of them are configurable. Each falls into one of these three categories:

  • Message
  • Message channel
  • Message endpoint

Messages are exchanged between message endpoints via message channels. A single message channel connects two message endpoints. But a message endpoint can have any number of (even zero) input and output message channels.

When a message endpoint consumes messages from a message channel, it can passively receive them (listen) or actively read them (poll). Message channels can pass through messages (direct channel) or temporarily store them in memory (queue channel).

When we are modeling SI flow, it’s important to understand whether the flow is bidirectional (of a request-response nature) or unidirectional (of a messaging or fire-and-forget nature). Based on these requirements, we may want to use specialized SI components designed for our case or use specialized mechanisms for replying to the originator of the message. Sometimes this flow can be combined (for example, read the value from the DB, expose it via the REST API, and send a message about this interaction into the tracking subsystem).

Image Note  Spring Integration provides a lot of basic in-memory components and many adapters for external systems, protocols, or technologies in order to enable extreme configurability and flexibility. Neither this book nor Enterprise Integration with Spring certification can detail each adapter that SI provides or attributes provided by SI components. Such a deep dive requires a book of its own. We cover major SI components and features that allow for a quick start with the framework so you can grasp the idea of the programming model. These principles and mechanisms can then be reused for additional components with the help of the SI reference documentation.

Common Classes Used in SI Examples

Before we start diving into concrete SI features, let’s introduce some simple classes and interfaces used across a lot of our examples. Because the SI topic is broad, we’ll go with Hello World–like examples. Trying to map these examples to a simple use case would make the examples bloated. The first common interface is shown in Listing 8-1.

This simple interface is designed to process a given text parameter and return the result of that action. It will be used for wrapping SI bidirectional flow. So we won’t implement this interface, but instead will be using SI message flow as an implementation of this interface. The second interface is shown in Listing 8-2.

This interface is similar to SiWrapperService, but in this case no result is returned to the caller. This one will be also be used to wrap SI flow, in this case unidirectional. Next we introduce the repository class in Listing 8-3.

For this example’s simplicity, this repository class will be fake. It will log the given message parameter and return a count of written records hard-coded to 1. It uses the @Repository annotation so that we can plug it into the Spring container via a component scan. The @Slf4j annotation was already mentioned and serves as an easy logger configuration. The last class from our common deck is shown in Listing 8-4.

The class is called WriteService, because its Hello World–type concern is to persist the message with the help of WriteRepository. It uses the @Service annotation, so we can use it as a Spring bean. The WriteRepository bean is injected via constructor injection.

It provides two methods. The write() method persists the message without indicating the result of that operation to the caller. This method will often be used as target/output logic for unidirectional SI flows. The second method, writeAndIndicateSuccess(), also persists the message and returns a Boolean value about the result of that operation. You probably already guessed that it will be used as target logic of bidirectional SI flows in the examples.

The SI flow mostly starts where messages/requests are received. Think about this endpoint as input to the SI flow. We will place Spring beans or interfaces belonging to the input part of the SI flow into the package net.lkrnac.book.eiws.chapter08.in. The common interfaces SiWrapperService and SiWrapperServiceVoid also belong to this bucket.

On the other hand, the SI flow also has an output part. Classes or interfaces belonging to the output part of the SI flow will be placed in the net.lkrnac.book.eiws.chapter08.out package. The common classes WriteService and WriteRepository belong here.

Message

The message interface used by the SI framework is defined in the Spring Core framework. It is represented by the Java interface, and its definition is shown in Listing 8-5.

The message is composed from headers and message payloads. The signature of this interface uses generics to enable users to handle any type as the payload of the message. The org.springframework.messaging.MessageHeaders type is a special implementation of Map<String, Object>. So we can store a header of any type into a message based on its header name.

By default, when any SI component needs to produce a message, it uses the implementation org.springframework.messaging.support.GenericMessage<T>. Notice that both the interface and implementation are not part of the SI framework, but part of the Spring framework and its messaging module. They were moved from SI into Spring Core since the 4.x versions of both frameworks, because the Spring team is trying to enhance interoperability of various messaging Spring abstractions.

The GenericMessage<T> message is immutable. After creation, we can’t change the message payload or message headers. So if we need to amend the payload or headers, we would have to create a new message and send this new one instead of the original.

This is important because it ensures that the message cannot be manipulated by threads running in the background of our logic. When developers can rely on messages not being changed randomly under the hood, they can focus on the higher-level abstractions of message flow and easily switch between synchronous (single-threaded) and asynchronous (multithreaded) flow.

But the immutability of GenericMessage<T> is shallow. Even when we can’t change the reference stored in the message payload nor the header value references, we can still change the fields of objects stored as the payload or as the header value if its class definition allows it. This fact can cause problems if we don’t take care, but there is nothing SI can do about it, as the payload and header values can be of any type.

Message headers are meant to store message metadata. We can store our custom business logic headers in the message, but there are also headers that SI creates or understands. All messages by default have the id or timestamp of the message.

Headers can be added to a message to amend SI behavior or change message flow. For example, we can use the header priority (so messages with higher priority can surpass messages with lower priority) or replyChannel (so the message can specify which channel will be used when the reply is sent to the caller component). We discuss these headers in detail later in examples.

SI messages can be created in two major ways:

  • Automatically by the SI framework
    • By SI adapter conversion from a transport format (for example, XML, JSON, file)
    • When SI generic components need to create a new message from an old message (for example, enriching headers, transforming message)
  • Programmatically

Programmatic Creation of Messages

Readers familiar with Java concepts understand that in order to achieve immutability of GenericMessage<T>, its payload and headers fields must be marked as final. So creation of such a message cannot be achieved via setters. We could use constructors provided by the GenericMessage<T> class, but the SI framework provides a much more convenient way, with the help of org.springframework.integration.support.MessageBuilder<T>.

With this class, we can use fluent APIs to do the following:

  • Create a message from scratch
  • Create a message from an existing message
  • Set default headers via specialized methods
  • Copy headers from other messages
  • Add/remove custom headers

If you are not familiar with the term fluent API, it will be described in a subsequent example. Notice that the Spring Core framework introduced in version 4 a messaging module with org.springframework.messaging.support.MessageBuilder<T>. It is a slightly different implementation from org.springframework.integration.support.MessageBuilder<T>, but produces messages of the same type. With SI, we should use org.springframework.integration.support.MessageBuilder<T>, because it can create various message headers tailored to SI communication.

Bear in mind that the focus of this example is the creation of messages, not their processing. The processing part of this example will be hidden in this example and explained later in the chapter. This example will use a wrapper for SI flow, shown in Listing 8-6.

This SI flow wrapper interface is slightly different from the common wrapper interfaces in Listings 8-1 and 8-2, because it accepts a full message object instead of just the payload of the message. This is because we want to pass the message with headers to the SI flow to highlight the MessageBuilder<T> features. Listing 8-7 shows the message creation.

This is a Java class with the main method using Spring Boot to start the application based on @SpringBootApplication and the SpringApplication.run() method. It imports the XML SI flow configuration located in the file si-config.xml. The SI flow hidden behind this SI configuration is not important for this message creation example and therefore we won’t focus on it now. The SI building blocks located in the si-config.xml XML configuration are explained in later examples. The only important fact here is that it configures the simple SI flow, which only logs the message behind the scenes.

After the SpringApplication.run(), the SI application is initiated and configured, and we can start using it. First we read the SiWrapperServiceMessage bean, so that we have an entry point for sending messages. You might be confused about how we can inject this bean without registering its implementation into the Spring context. We just mention here that implementation of this interface is configured in si-config.xml and consists of SI components.

The logic following this call will be explained after we execute this class as a Java application. We can observe similar output as in Listing 8-8.

With MessageBuilder<T> help, we create a message with the payload message1, header simpleHeader, and value simple header. Calls against MessageBuilder<T>, where we chain methods via dot calls, is the fluent API we mentioned. This chain (fluent API) can continue until we are finished with message creation. Creation is finished by calling MessageBuilder<T>.build(). This call at the end is needed because the created message is immutable and the build() method calls all the necessary constructors for Message<T> and MessageHeaders to create our message based on the recorded configuration.

After we create the message, we call the wrapperService bean to initiate the SI flow, which in this simple case logs the message.

The second message is created from message1 and adds some SI default headers via specialized methods. In this case, we configure priority, which can be used to amend message consumption ordering in the SI flow, and correlationId, which can be used to aggregate various messages into one message. We don’t need to worry about these SI features now, as they are explained later in this chapter. The feature highlighted here is the fact that we can use specialized methods to create message headers that SI understands and can use them to affect the flow of messages.

The third message copies all headers from message2. This is handy when we want to slightly amend the SI message. In this case, the mentioned amendment is removal of the simpleHeader header.

The last message is created with the payload message4 and configured with the priority value 5. Next we copy all the missing headers from message2. We are specifically interested in header simpleHeader, but because message4 should have higher priority, we use the copyHeadersIfAbsent() method, because message2 had lower priority with value 10 and we want to it to have priority with value 5.

Parsing Message Headers

We managed to explain programmatic message creation with headers for SI flow. The next feature, which is not specific to the SI framework but is a useful mechanism for SI message processing, is parsing of message headers. It’s good to highlight that this feature of the Spring framework was already shown in Chapter 5.

So without too much buzz, we’ll jump straight to the point and show an example of parsing headers of a message in Listing 8-9. We will use the same constructs as in the previous example, but still hide the SI flow. We will get there; don’t worry.

Here the @Service class injects via constructor injection WriteRepository, belonging to the common deck of classes for this chapter. We are interested in the write method, where we accept the message payload as the string parameter message. The second parameter is annotated by @Header, which defines interest in the timestamp header (based on the annotation attribute) of the message and injects it into the creationTime variable. The annotation attribute is not mandatory if the parameter name matches the header name. The last parameter injects all the headers of the message into a map, which is defined by the @Headers annotation. These injected parameters are logged, and writeRepository is called to persist a message payload afterward.

We still keep the SI flow hidden for now; later examples will reveal SI features. The main class of this example looks exactly the same as the main class from the programmatic message creation example in Listing 8-7. The WriteServiceWithHeaders class is executed from the SI flow defined in si-config.xml. When we run the main class for this example, we get output similar to Listing 8-10.

As we can see, logged messages have the same structure as messages in Listing 8-8, but they’re logged in a different format to highlight the header parsing features.

Message Endpoint

The message endpoint is probably the most diverse SI component. The SI reference documentation refers to it as a filter from the pipes-and-filters architecture. We already mentioned that in order to send or receive messages, the endpoint has to be connected to one or more message channels. Its primary role is to encapsulate application logic and decouple it from the SI messaging infrastructure. Our application code often doesn’t even know that it’s part of the SI flow. A good example is WriteService, shown previously in Listing 8-4. From a first look at this class, we can’t say whether it’s a standard Spring bean or part of the SI flow.

As we mentioned, it makes sense to model SI flow in an XML configuration. The message endpoint, as a generic component, doesn’t have any XML tag representation, because each type of specific component has its own XML tag. But some parts of the SI flow can be configured via Java annotations. In such a case generic annotations are required for message endpoint configuration.

The first, most important, annotation from the SI framework is @EnableIntegration. It enables the declaration of standard SI beans when we use Spring’s @Configuration annotation. The same effect can be achieved with Spring Boot’s @EnableAutoConfiguration annotation or @SpringBootApplication. So when we use Spring Boot with autoconfiguration, we can skip @EnableIntegration, because it will be configured automatically for us.

Another generic SI annotation is @IntegrationComponentScan. As the name suggests, it scans the classpath for SI message endpoints. It works similarly to the @ComponentScan we are used to from the Spring framework for SI-specific components. So it has to be used with the @MessageEndpoint or @MessagingGateway annotations (we explain these annotations later in the chapter), which register SI-specific beans into the Spring context. This annotation is needed for registering SI-specific components even if we are using Spring Boot with autoconfiguration enabled (@EnableAutoConfiguration or @SpringBootApplication).

The last generic annotation is @MessageEndpoint, which marks the class participating in SI flow, where some of its methods will be annotated with more-specific SI annotations. These specific annotations are explained later, when we discuss concrete types of message endpoints.

Use of these annotations is shown as part of specific SI component examples.

Message Endpoint Types

We can categorize message endpoints in various ways, detailed in the following subsections.

Message Flow Direction

  • Unidirectional : The component can participate in fire-and-forget scenarios.
  • Bidirectional: The component can participate in request-response scenarios. This flow involves sending a response to the upstream consumer via replyChannel.

Placement in the Message Flow

  • Input component : Connects the SI flow to the upstream external application or internal Java logic. Messages enter the SI flow via this component.
  • Output component : Connects the SI flow to the downstream external application or internal Java logic. Messages are sent out of the SI flow via this component.
  • Middle component : Capable of operating in the middle of the flow. This component can participate in unidirectional as well as bidirectional message flow.

Triggering Activity of the Component

  • Passive: The component waits until the message is pushed to it via the input channel. This mode is usually part of synchronous execution and is able to propagate transactions to the output channel or bubble up exceptions to the originator of the message.
  • Active: The component actively reads messages from the input channel based on internal triggering. This triggering can be achieved with a special subcomponent called a poller (configured via the <poller> XML tag or the @Poller annotation). An important requirement for this mode is that the input channel needs to be capable of temporarily storing messages flowing into it. We discuss this channel type later in this chapter.

Component Purpose

Generic component

  • Maps the message channel to the Java interface or class. If it’s mapped to the Java interface, the implementation is covered by SI abstractions.
  • Handles the message endpoint concern (for example, routing, message transformation) on top of standard Java classes or interfaces.

Technology-specific component

List of Message Endpoints

It’s important to understand that each particular message endpoint can operate in various modes, based on its configuration. But once the component is configured, we can’t switch these modes at runtime.

The following is a list of the major message endpoints that SI provides and the modes they can operate in. They are also divided into three major functional categories.

  • Messaging components: Used to connect the SI flow with external technology or Java logic
    • Channel adapter : Unidirectional, input/output, passive/active, generic/specific
    • Service activator : Unidirectional/bidirectional, output/middle, passive/active, generic
    • Messaging gateway : Unidirectional/bi-directional, input/output, passive, generic/specific
  • Transformation components: Their function is to convert the message payload or headers to a different format. They are middle components that can participate in unidirectional as well as bidirectional message flow.
    • Transformer : Passive/active, generic/specific
    • Content enricher : Passive/active, generic/specific
  • Routing components: Their function is to control the flow of messages. They are middle components that can participate in unidirectional as well as bidirectional message flow.
    • Filter : Passive/filter, generic/specific
    • Router : Passive, generic/specific
    • Bridge : Passive/active, generic
    • Chain : Passive, generic
    • Splitter : Passive/active, generic/specific
    • Aggregator : Passive, generic

Image Note  This is not a complete list of SI message endpoints. This list covers the major endpoints you are most likely to use in practice for standard integration use cases. Special endpoints for various corner use cases are beyond the scope of Enterprise Integration with Spring certification and thus also beyond the scope of this book. For a full list of message endpoints, refer to the SI reference documentation at http://docs.spring.io/spring-integration/docs/latest-ga/reference/htmlsingle/.

Channel Adapter

The channel adapter is an SI endpoint component for unidirectional message flow. So it is suitable for fire-and-forget scenarios. From a triggering point of view, it can act as a passive as well as active component (if the poller subcomponent is configured). As each unidirectional message flow needs to have input and output, we have two types of channel adapters:

  • Input channel adapter: Sits on the input side of the SI flow, where it passively listens or actively reads messages
  • Output channel adapter: Sits on the output side of SI flow, where it sends messages to the Java class or external system or protocol

The term adapter suggests that it maps the message channel to a non-SI system or subsystem. From this point of view, we can divide channel adapters into these two categories:

  • Generic Java adapters: Map the message channel to the Java interface or class. If the adapter is mapped to the Java interface, implementation is covered by SI abstractions. Can be configured via the XML tag <int:inbound-channel-adapter> or <int:outbound-channel-adapter>, where it uses the core SI XML namespace. The inbound channel adapter can be also configured via the @InboundChannelAdapter. This annotation is not available for the outbound channel adapter as of SI version 4.1.6.RELEASE.
  • Technology-specific adapters: Connect the message channel to an external system or transport protocol (for example, FTP, HTTP, JMS). SI provides numerous implementations of these adapters for different technologies, which can be configured via only XML tags. For each technology, we need to use specific XML namespaces (for example, <int-http:inbound-channel-adapter>, <int-amqp:inbound-channel-adapter>, <int-mail:outbound-channel-adapter>, <int-jms:outbound-channel-adapter>). External adapters don’t have any SI annotations available, so we have to use XML configuration for them. A concise table presenting channel adapter implementations for external technologies can be found at http://docs.spring.io/spring-integration/docs/latest-ga/reference/htmlsingle/#endpoint-summary.

Generic Inbound and Outbound Channel Adapters Example Configured with XML

The following generic channel adapter examples are the only examples in this chapter not using Spring Boot, in order to highlight how to configure SI applications with pure Spring and SI frameworks.

This example covers generic inbound and outbound channel adapters based on XML configuration. The first class, shown in Listing 8-11, is a service we will be actively polling.

This Spring service bean has just one simple method that returns a simple string message. Listing 8-12 shows the XML configuration of the SI flow.

SI provides various XML namespaces for generic SI components and separate namespaces for particular technology we want the SI flow to hook on. In this case, we use only a generic SI namespace aliased to int. Alongside this SI namespace, we also use the standard Spring namespaces context and a default one for handling Spring beans.

The first XML tag <context:component-scan> performs classpath scanning for Spring components in the base package net.lkrnac.book.eiws.chapter08 and its subpackages. This scan includes ReadService and our common beans WriteService and WriteRepository.

Next we can use the SI tag <int:inbound-channel-adapter> , which will be connected to the readService bean. We also have to specify the attribute method, which maps this inbound channel adapter to readService.read(). As ReadService is a passive bean not sending any messages, we need to configure the <int:poller> subelement for this adapter. With attributes for this poller, we define a 1-second fixed-rate poll interval and we will read only one message per poll. Under the hood, SI uses a standard Spring scheduler with the fixed rate of 1 second in this case. So every second, this channel adapter will call readService.read(),and create an SI message out of its return value.

This message is then handed to another inChannel, which is defined by the mandatory attribute channel. inChannel also has to be configured for SI flow, and this is done via the separate XML element <int:channel>. On the other side of inChannel, we connect the outbound channel adapter via the XML tag <int:outbound-channel-adapter> . This adapter is mapped to our common bean, writeService, and the write method.

The main class of this example is shown in Listing 8-13.

This class starts the Spring context defined in si-config.xml. When we run this main class as a Java application, we get the output shown in Listing 8-14.

The flow is executed in the task scheduler threads instead of the main thread. This is because we specified a poller for the inbound channel adapter, which reads a new message each second and sends it to writerService. This is also why this Spring context remains open and would run indefinitely until we forcibly stop the application.

We need to bear in mind that the types of Java POJO on input and output of this flow must match. In this case, it is of type String, which is used as a payload of GenericMessage<String> flowing. It is also important to understand that SI components register Spring beans into the Spring context. For example, the <int:channel> tag created a Spring bean with the name inChannel and the type DirectChannel. This channel type is explained later in the “Message Channel” section.

Figure 8-1 shows one more interesting thing about this example.

9781484207949_Fig08-01.jpg

Figure 8-1. Visualization of generic channel’s adapter SI flow

Figure 8-1 depicts our SI flow. It is nice, because it effectively shows a live diagram of our SI application—something every architect dreams of. This visualization can be found in the STS writer window of the si-config.xml file, between the bottom tabs. From my experience, a lot of developers don’t even know where these tabs are located in Eclipse/STS, so let’s show it visually in Figure 8-2.

9781484207949_Fig08-02.jpg

Figure 8-2. Location of integration-graph tab

But this is not only a visualization. We can also create SI flows based on this integration-graph tab. As you can see, on left side of the writer window are available SI components, and we can drag them into our flow. When the component is located on our graph, we can double-click it, and STS will open the Properties window, where we can edit properties of this component. This is shown in Figure 8-3.

9781484207949_Fig08-03.jpg

Figure 8-3. Editing SI component attributes via the Properties tab

As we can see, all the possible attributes for a particular SI component are listed. Mandatory attributes are underlined.

Generic Inbound Channel Adapter Example Configured with Java Annotation

Next we will show how to configure the same flow with a Java configuration. Listing 8-15 shows a partial XML configuration of SI flow, because so far there isn’t an annotation for the outbound channel adapter.

This configuration of an outbound channel adapter was already shown in a previous example. The channel could be configured with a Java configuration, but we show that configuration later in the examples. Listing 8-16 shows a Java configuration of an inbound channel adapter.

In this case, the read service method read() is annotated with @InboundChannelAdapter, which makes it an SI message endpoint. We need to specify the mandatory value attribute of this annotation to connect it to the message channel inChannel.

Because we also want to have this component active (which means it will be actively polling an annotated method), we configure @Poller with a fixed rate of 1 second for this adapter. This annotation will create a Spring scheduler under the hood to perform active polling. With the maxMessagesPerPoll attribute, we define that only one message should be read per poll.

Listing 8-17 shows the main class of this example.

This main class also acts as a Spring configuration class (@Configuration annotation) and scans the current package with subpackages for Spring beans. The @ImportResource annotation also plugs into this Spring context the XML part of the SI flow. The last annotation, @EnableIntegration, turns on the SI features. After running this class, we would see similar output as in Listing 8-14 for the 0803-channel-adapter-generic-xml example.

The downside of the Java configuration is that the SI flow can’t be visualized or edited via the integration-graph editor of STS.

Nongeneric Channel Adapters Example

Now it’s time to explore adapters for external technologies. In this case, we expose our SI flow as a REST service on the input side and connect it to a JDBC data source on the output side. As a database, we use the H2 database engine. When Spring Boot autoconfiguration detects it on the classpath, it configures the jdbcTeamplate bean automatically. Listing 8-18 shows the SI configuration.

We use the beans and context Spring namespaces and the generic SI int namespace again. New namespaces are int-http and int-jdbc to allow the use of adapters for these technologies. The first XML element is <int-http:inbound-channel-adapter> . It exposes inChannel as a REST endpoint, which listens to POST messages on the root URL. The payload will be a simple string.

On the other side of inChannel, we configured the connected SI to the JDBC data source via <int-jdbc:outbound-channel-adapter>. This component needs to know which data-source bean to use for JDBC access. In this case, we refer it to the bean with the name dataSource, which is created in the Spring context (creation of it is discussed later in this section). Of course, when we are accessing the database, we need to have a query to execute. In this case, we use the inline query attribute to define it. It is a simple insert into TEXT_TABLE, where the stored value will be used from the SI message payload. This payload is received from the REST endpoint.

Listing 8-19 shows initialization of the TEXT_TABLE table.

This component will initialize the in-memory database based on the injected jdbcTemplate bean it autowires. The JdbcTemplate instance is created automatically by Spring Boot. Listing 8-20 shows the Spring Boot configuration of the server port for HTTP access.

Finally, Listing 8-21 shows the main class of the application in this example.

We use Spring Boot here to initialize everything with autoconfiguration by the using @SpringBootApplication annotation. As we have the JAR dependencies needed for running the embedded servlet container as well as in-memory database support on the classpath, Spring automatically configures for us the servlet container and jdbcTemplate and dataSource bean to access this in-memory database. This annotation also scans the current package and subpackages for Spring beans, and of course initializes SI. You can think of it as having @EnableIntegration under the hood, because @SpringBootApplication turns on this feature via autoconfiguration.

The next annotation imports our XML SI configuration. In the main method, we run the Spring Boot application via the SpringApplication.run() call. When we run it, the application listens on address http://localhost:18080. Any text POSTed to this address will be persisted into TEXT_TABLE. Chapter 4 already covered REST and how to send payloads via HTTP. We used the Advanced REST Client Chrome plug-in for this purpose, so please refer to Chapter 4 if you are unsure about sending HTTP messages.

Visualization of the SI flow looks similar to the XML generic channel adapters example in Figure 8-1.

Service Activator

A service activator is a generic SI message endpoint, which maps a message channel onto a Spring bean and invokes a specified method. The message received by this endpoint will try to be converted (if needed) into a parameter or various parameters of the mapped method. Similarly, the eventual return value from the method will try to be converted into a new message (if needed). If any of these conversions fail, an exception will be thrown.

The service activator can be plugged into the middle of the SI flow or as an output component; therefore, the input channel attribute is mandatory. This fact also means that the method mapped to the service activator must have at least one parameter, so that the message can be passed to the service. It can participate in a unidirectional as well as a bidirectional message flow. The difference in these flows depends on the signature of the method mapped into the service activator.

If the mapped method is void (it doesn’t return any value), it can act as a unidirectional output message endpoint. This mode is pretty much the same as that of the generic outbound channel adapter.

If the mapped method returns a value, the service activator can act as follows:

An output component in bidirectional SI flow.

  • A middle component in bidirectional or unidirectional SI flow. In this configuration, it is similar to a message transformer endpoint (will have its own section) in the sense that it receives a message and returns a different message. The fundamental difference here is that the message transformer is designed to convert messages, whereas the service activator is designed to invoke a method on the Spring bean and optionally gather a return value into the message.

If the mapped method returns null and an upstream SI component requires a response (for example, an upstream message gateway has request-reply configured to true), an exception is thrown in the SI flow.

For configuring the service activator, we use the <int:service-activator> XML tag or @ServiceActivator annotation.

Service Activator Example with XML Configuration

Listing 8-22 shows the XML SI configuration.

The input part of the SI message flow is identical to 0803-channel-adapter-generic-xml from Listing 8-12. In fact, we are using the same readService bean for message creation. The difference is in the output part of the SI flow, which uses <int:service-activator> to map the writeService.write() method to our message flow. As you may remember, this method is void, so no response will be generated and it is compatible with the input channel adapter’s unidirectional nature. The main class of this example is shown in Listing 8-23.

This is the Spring Boot configuration class and the main class, which executes itself as the Spring Boot application. This is a common pattern with Spring Boot. The @ImportResource annotation includes the XML SI flow into this Spring context. @SpringBootApplication also covers component scanning for Spring beans. That is why references to readService and writeService are available for us in the si-config.xml file.

Running this example results in output similar to Listing 8-14 for the 0803-channel-adapter-generic-xml example. Visualization of this SI flow is shown in Figure 8-4.

9781484207949_Fig08-04.jpg

Figure 8-4. Visualization of SI flow for XML channel adapter example

Service Activator Example with Java Configuration

Listing 8-24 shows the Java SI configuration .

This standard Spring configuration class defines one bean of type MessageChannel and name inChannel. The DirectChannel implementation is used in this case. This channel type is explained later, in the “Message Channel” section. Listing 8-25 shows the Java configuration of the service activator.

This service class injects the WriteRepository bean via constructor injection, so that we can call it in order to process the received message. The message is received via WriteServiceAnnotated.write(), which is annotated with @ServiceActivator, and this is participating in a unidirectional SI flow. The inputChannel attribute is mandatory and refers to inChannel from Listing 8-24.

Another bean included in this example is ReadServiceAnnotated from Listing 8-16, which is an annotated version of the inbound channel adapter. The main class of this example is shown in Listing 8-26.

This standard Spring Boot pattern runs an autoconfigured application that scans for beans in the current package and subpackages. Running it results in output similar to Listing 8-14 for the 0803-channel-adapter-generic-xml example, because the inbound channel adapter mapped to ReadServiceAnnotated reads a message each second and sends it to the service activator.

Messaging Gateway

The messaging gateway is a passive component that can participate in unidirectional or bidirectional message flows. So it inherently handles request-response concerns. We can divide gateways into two categories:

  • Generic messaging gateway: Wraps SI message flow into the Java interface as input to the message infrastructure. Therefore, the method wrapping the flow must have a return value. We can think of it as a proxy between Java and the SI flow. It can be configured via the XML element <int:gateway>. The Java configuration uses two annotations: @MessagingGateway marks the interface as a gateway wrapper, and @Gateway maps a particular method as an entrance to SI messaging. Notice that we can annotate various methods in the same class with the @Gateway annotation and in this way have various entry points to our SI flow. Therefore, we need to scan the annotated inbound gateway with @IntegrationComponentScan.
  • Technology-specific messaging gateway: Can handle communication with an external technology or protocol (for example, HTTP, JDBC, SOAP Web Services). This type of gateway can be inbound or outbound in relation to SI flow. To configure a technology-specific gateway with XML, we need to use special XML namespaces (for example, <int-file:outbound-gateway>, <int-http:inbound-gateway>, <int-ws:outbound-gateway>). A technology-specific gateway doesn’t have any SI annotations available, so only an XML configuration is possible. A concise table listing messaging gateway implementations for external technologies is available at http://docs.spring.io/spring-integration/docs/latest-ga/reference/htmlsingle/#endpoint-summary.

If we want to have an inbound messaging gateway participating in bidirectional flow of messages, there has to be a mechanism for sending reply messages back to the gateway in place. We can handle this in various ways:

  • Temporary reply channel: By default, the inbound messaging gateway creates a temporary channel, which will be used when the downstream output endpoint replies with a message. This channel is created for each message flowing through the SI flow.
  • Reply channel specified on the message: The message can have the special header replyChannel defined, which will be used when the downstream output endpoint replies with a message
  • Default reply channel defined on the inbound gateway: If the gateway has specified the attribute default-reply-channel (reply-channel with technology-specific gateways), the reply message from the downstream output component will be passed to this channel.
  • Output channel specified on downstream output endpoint: If the output endpoint of the SI flow specifies the output-channel attribute, this channel will be used for sending a reply message.

Generic Messaging Gateway Example Based on XML Configuration

Listing 8-27 shows the XML configuration of the SI flow.

The first XML element specifies the generic inbound gateway. We need to define the mandatory default-request-channel, which specifies the channel where inbound messages will be passed in (in this case, inChannel). The service-interface attribute is also mandatory and specifies the Java interface, which will be used for hiding SI flow from caller. In this case, we use our common interface SiWrapperService. Notice that when the request arrives to the gateway and is converted into a message, a temporary reply channel is created for this message. The gateway will be expecting a response message via this temporary reply channel.

Messages received by this gateway will be passed to the service activator endpoint, which is mapped to the writeService.writeAndIndicateSuccess() method. This method returns the result of the write operation; therefore, we are dealing with bidirectional message flow. To send a reply message back to the gateway, the temporary reply channel created for the request will be used automatically. When a value is returned from WriteService.writeAndIndicateSuccess(), the service activator converts this value into a reply message and sends it to the temporary reply channel. The gateway receives this response message via the temporary reply channel and converts it to a return value of the service interface SiWrapperService.

Listing 8-28 shows the main class of this application.

We are dealing with a Spring Boot application that scans for Spring beans. In this case, we need to have WriteService scanned. @ImportResource includes your XML SI flow into this application context. @Slf4j enables logging for this class. In the main method, we execute the Spring configuration defined by this class. Remember this Spring configuration, because it will be used alongside most subsequent examples, so we won’t need to explain it again and again.

After the Spring context with SI flow is up, we retrieve the SiWrapperService bean from the Spring context. You may wonder where the implementation of this interface is defined. So notice that the gateway defined in the XML configuration represents the implementation of this interface. So by executing wrapperService.processText(), we send a message to the gateway. After the reception of the response, we just log it. Running this example results in the output shown in Listing 8-29.

As expected, the input message is logged in WriteRepository, and the response of this operation is logged in the main class. Visualization of this SI flow is shown in Figure 8-5.

9781484207949_Fig08-05.jpg

Figure 8-5. Visualization of SI flow for generic inbound gateway example

Generic Messaging Gateway Example Based on Java Configuration

Listing 8-30 covers the SI configuration class.

Alongside the standard Spring @Configuration annotation, we also use the @IntegrationComponentScan annotation. It is needed to scan the current package and its subpackages for SI components. In addition, the inChannel bean is configured as an SI channel. Listing 8-31 shows the output of the service activator configuration.

This service activator is different from the previous examples, because it returns the result of the write operation from the mapped method WriteServiceAnnotated.writeAndIndicateSuccess() and thus is able to participate in bidirectional message flow. As we already know, bidirectional flow involves automatic creation of a temporary reply channel, which is used to send a reply back to the upstream consumer. Listing 8-32 covers the inbound gateway configuration.

The @MessagingGateway annotation is needed to mark this interface as an SI inbound gateway. The wrapper method is annotated by @Gateway, where requestChannel is a mandatory attribute. This interface doesn’t have any implementation defined, because SI creates an implementation based on the SI flow definition. Notice that this method returns a Boolean value, which will be converted into a reply message of bidirectional message flow, where a temporary reply channel is created under the hood automatically. Listing 8-33 shows the main class of this example.

@ImportResource is not needed in this case, as we have a pure Java configuration. After the Spring Boot application is started via the main method, we read our service interface from the Spring context, send the message to the SI flow, and log the result. The output from running this main class looks similar to the output of the previous example, 0808-gateway-generic-xml in Listing 8-29.

Technology-Specific Gateway Example

This example uses an HTTP inbound gateway and JDBC outbound gateway to handle bidirectional SI flow. Listing 8-34 shows the XML SI configuration.

We use two technology-specific XML namespaces here: int-http and int-jdbc. The input part uses an HTTP inbound gateway, which is mapped to the root URL assigned by the servlet container and consumes only POST HTTP requests. After conversion, the message is passed to inChannel.

In the output part of the SI flow, we have a JDBC outbound gateway, using the dataSource bean to access an in-memory DB. The query used to insert the message into TEXT_TABLE should already be familiar from Listing 8-18, the 0805-channel-adapter-non-generic example. SimpleDatabasePopulator, which initializes the DB table, is also taken from the same example (see Listing 8-19). The main class is also the same as in the 0805-channel-adapter-non-generic example (Listing 8-21). It executes the Spring Boot application and leaves it listening for POST requests. Chapter 4 covered how to send HTTP messages.

Transformer

A message transformer is a message endpoint that sits in the middle of the SI flow. Like all middle SI components, it can participate in unidirectional as well as bidirectional flow. It can operate in passive mode. It also can have a poller subcomponent and thus act as an active component. These types of transformers are possible:

  • Generic message transformer: Transforms the received Java object into another Java object. A new message is created because, by default, all the messages are immutable. So a generic transformer is a good candidate for using MessageBuilder<T> to create a converted message from the original received message. The XML configuration uses the <int:transformer> tag. When we want to configure it via Java configuration, we need to annotate the Transformer class with @MessageEndpoint and annotate the transformation method with @Transformer. To scan this bean into the Spring context, we need to use @IntegrationComponentScan somewhere in the Java configuration classes.
  • Technology-specific transformer: Converts the received message between various standard Java types (for example, Map, Object) and various common formats (for example, JSON, XML). These transformers can be configured only via XML tags (for example, <int:json-to-object-transformer>, <int:map-to-object-transformer>, <int:object-to-json-transformer>).
  • Header filter: A subset of the message transformer that removes specified headers. Can be configured via <int:header-filter>.

Generic Transformer Example with XML Configuration

Listing 8-35 shows the transformation class.

This class is not even a Spring bean and has one method that appends a string to the message payload. It’s easy to imagine that any transformation logic could be present here. As long as the method returns a value that can be represented or converted into an SI message and parameters that SI can inject values in (for example, a payload or headers marked with @Header/@Headers annotations), it can be used in the SI flow.

Listing 8-36 covers the SI flow configuration.

The inbound gateway is already a familiar component from the previous examples. Again, it maps our common service interface SiWrapperService and sends messages into inChannel. This channel is then connected to the transformer. The <int:transformer> tag specifies input-channel, output-channel. The transformation bean is created via the <bean> subelement, which is a normal Spring construct for creating beans.

The output channel of the transformer is then connected to the service activator mapped to our common WriteService.writeAndIndicateSuccess(). As this method returns a value, we are dealing with bidirectional flow, and a response is sent back to the gateway component via a temporary reply channel. Listing 8-37 shows the main class of this example.

Before sending a message to the SI gateway via its service interface SiWrapperService, we first log it as well as the result returned from it. Listing 8-38 shows the output of this example if we run the main class as a Java application.

The WriteRepository bean logs the transformed message. Figure 8-6 shows this SI flow.

9781484207949_Fig08-06.jpg

Figure 8-6. Visualization of generic transformer flow

Generic Transformer Example with Java Configuration

Let’s now dive into the same flow based on Java annotations. Listing 8-39 shows the transformer configuration.

The transformer logic is the same as in the previous example, 0811-transformer-generic-xml in Listing 8-35, but in this case we annotate the class with @MessagingEndpoint, which makes it eligible for integration component scanning. The conversion method is annotated with @Transformer, and we also have to specify the mandatory attributes inputChannel and outputChannel. Listing 8-40 shows the SI configuration class.

This configuration class enables the SI component scan based on the @IntegrationComponentScan annotation, which is needed to scan SimpleMessageTransformer. It registers two channels used in our SI flow: inChannel and transformedChannel. Listing 8-41 shows the annotated service activator configuration.

This service activator is similar to the service activator in example 0809-gateway-generic-javaconfig (Listing 8-31), with one difference. The service activator input channel is in this case transformedChannel. We also borrowed the class from SiWrapperServiceAnnotated of the same example and Listing 8-32.

So the integration component scan has the chain SiWrapperServiceAnnotated image SimpleMessageTransformer image WriteServiceAnnotated. Listing 8-42 covers the main class of this example.

After we read the service interface from the context, we create the message, log it, and send it to the SI flow. Finally, we log the result of this operation. As it returns a value, it is a bidirectional request-response type of flow. The output looks the same as for 0809-gateway-generic-javaconfig (Listing 8-38). Visualization of the flow is not possible, as annotated SI flows are not currently being visualized in STS.

JSON Transformer Example

We are also going to show a technology-specific example of the transformer. Because one of the most common concerns nowadays is conversion of JSON into Java objects, we are going to cover it. Obviously, we need an object to convert JSON into. So Listing 8-43 shows the User class.

It is a simple Java POJO, where getters and setters are generated by the Lombok annotation @Data. Listing 8-44 shows a fake repository for User objects.

Similar to the common WriteRepository class we use in nearly every example, this repository is also faking the persistence of the given parameter, which is the User object in this case. Listing 8-45 shows the service bean calling this fake repository.

This service bean autowires a UserRepository instance as a dependency. The processUser method takes the User object as a parameter, and passes it to UserRepository to persist it. This is a similar pattern as with WriteRepository.write(). Listing 8-46 finally shows the SI flow of this example.

If we are converting JSON to a Java object, a good example would be to use an HTTP inbound gateway for it. So we include the int-http namespace into our XML configuration. This inbound gateway is mapped to the POST HTTP method of the root URL and sends the received JSON payload as a text message to jsonChannel.

This channel is connected to the specific transformer <int:json-to-object-transformer>. We need to specify the type attribute into which the text message with the JSON payload should be converted. In this case, it is our User class. Spring Integration uses standard Jackson converters under the hood, as it would with Spring MVC conversion. The output-channel attribute is also mandatory. The new transformed message with the User payload is sent to it.

Finally, we map the service activator to UserService.processUser(). Listing 8-47 shows the Spring Boot configuration file.

This specifies the port on which the HTTP gateway will be listening to POST requests. Listing 8-48 shows the main class of this example.

This is the Spring Boot configuration and main class that executes our SI flow. When we start it, the application remains open and will be waiting for POST requests on the URL http://localhost:18080. So to initiate this request, let’s use the simple tool curl. This command is shown in Listing 8-49.

Parameter –i includes a header in the message; the -X parameter specifies the HTTP method used; -H defines the HTTP header to include; and the –d parameter specifies the payload. After executing this command, we get the response shown in Listing 8-50.

This indicates a successful processing of the message we sent. Of course, our server is not fully HTTP compliant, because it should reply with the location URL of the created resource, but let’s ignore this fact so as not to complicate this example. Listing 8-51 shows the output logged by the application.

The UserRepository bean has received the converted User object from the JSON payload. Notice that for this example, it is not possible to create a Java configuration mirror, because the <int-http:inbound-gateway> SI component doesn’t have a Java annotation counterpart.

Content Enricher

A content enricher is similar to a transformer in messaging endpoint characteristics. It can participate in unidirectional or bidirectional communication, can be passive or active, and fits into the middle of the SI flow.

From a functional point of view, these types of content enrichers are possible:

  • Generic header enricher: Most common use of this endpoint, which adds a new header to the message. Of course, the message is by default immutable, so a generic context enricher creates a new message instance out of the original one with an enriched header(s). It doesn’t have any SI annotation available, so it can be configured only via the XML element <int:header-enricher>.
  • Generic payload enricher: This is effectively a special type of message transformer, enriching the payload of the received message by adding information. This enricher can be configured only via the XML element <int:enricher>.
  • Technology-specific header enricher: Enriches headers for three technologies: Mail, XPath, and XMPP (Jabber protocol). These configurations are possible only via XML configuration, which uses special technology-specific XML namespaces: <int-xml:xpath-header-enricher>, <int-xmpp:header-enricher>, and <int-mail:header-enricher>.

Header Enricher Example

Listing 8-52 shows header enricher logic.

This Spring bean checks whether the received message contains 1 and returns a header value based on the result. The return value will be used in the SI flow to enrich the message. Listing 8-53 shows the SI flow configuration.

The inbound gateway is configured to use our common service interface SiWrapperServiceVoid, convert its parameter into a message, and pass it to inChannel.

From this channel, the message is picked up by the header enricher, which defines the input-channel and output-channel mandatory attributes on the top-level XML element <int:header-enricher>. Now we can define one or more headers to enrich. This is done via the XML sub-element <int:header>. For a single header, we need to specify name, which will become a key in the headers map, and the ref and method of the bean that contains header enricher logic. In this case, it is SimpleHeaderEnricher.addHeader(). In the <int:header> tag, we can also use Spring Expression Language (SpEL) to define enricher logic inline. Use of SpEL is shown later in the chapter.

The last element in our flow is the outbound channel adapter, which encapsulates the logic shown in Listing 8-54.

This service injects our common WriteRepository to persist the received message payload. It also logs this payload with simpleHeader that was enriched earlier in the flow. Listing 8-55 covers the main class of this example.

After running the Spring Boot application, we retrieve the wrapperService bean from the context and send two messages with the payload message1 and message2 to the SI flow. Again, SiWrapperServiceVoid is the entry point to the SI flow. The output after running it is similar to Listing 8-56.

The header enricher logic is applied: message1 is enriched with the header1 value, and message2 is enriched with the header2 value. Figure 8-7 shows this header enricher SI flow.

9781484207949_Fig08-07.jpg

Figure 8-7. Visualization of header enricher flow

Filter

A filter is another middle SI message endpoint that can participate in bidirectional and unidirectional flows. It can act as a passive as well as an active component. Its purpose is to decide whether the received message should or shouldn’t be passed to the downstream flow. This decision can be based on generic logic, which can be configured via the <int:filter> XML element or the @Filter annotation. The annotation configuration doesn’t provide the flexibility of XML, which can be based on the following:

  • Custom Java logic: Should implement the interface MessageSelector and can be defined via the <beans:bean> subelement of the <int:filter> tag or its ref attribute.
  • SpEL expression: Can be configured via the <int:expression> XML subelement or the expression attribute of the <int:filter> XML tag.
  • Custom script: Supports Ruby and Groovy via the <int-script:script> subelement of the <int:filter> XML tag.

Alongside these generic filter types, we can also configure special filters for these technologies:

  • XPath filter: <int-xml:xpath-filter>
  • XML validation filter: <int-xml:validating-filter>

The filter’s decision to pass the message to the next component in the SI flow is represented by the Boolean value returned by the filter logic (whether it is Java logic, SpEL, or scripting). If the value is false, the filter gives us the opportunity to discard the message or throw an exception in the SI flow. If we discard the message, we can specify discardChannel to route the message.

An important fact is that by default the filter silently discards messages.

A filter can also be embedded into unidirectional flow as the channel adapter’s attribute (for example, local-filter for <int-ftp:inbound-channel-adapter> or mail-filter-expression for <int-mail:imap-idle-channel-adapter>).

Filter Example Based on XML Configuration

Listing 8-57 shows custom filter logic .

This Spring bean, which implements the MessageSelector interface, can be used as filter logic in the SI flow. It needs to implement the MessageSelector.accept() method. In this case, we inspect the String payload of the message and refuse it if it contains the corrupt substring. Listing 8-58 dives into the SI flow configuration for this example.

The inbound gateway uses our common SiWrapperServiceVoid bean to hide the SI flow, which indicates unidirectional message handling. Messages are handled in inChannel, where the filter comes into play. It has to define mandatory input-channel and output-channel attributes and one of the expression or ref attributes. In this case, it is a reference to our filter bean simpleFilter.

The message is then sent to filteredChannel and passed to the service activator, which executes another common bean, WriteService.write(). To highlight that filtering works, take a look at Listing 8-59.

We sent three messages to the SI flow, one of which is corrupted. So in Listing 8-60 we expect to see only two messages.

As we can observe, the example works as expected. Figure 8-8 shows this SI flow.

9781484207949_Fig08-08.jpg

Figure 8-8. Visualization of filter example flow

Filter Example Based on Java Configuration

Let’s now dive into the Java configuration of the filter component. Listing 8-61 shows the Java configuration.

This Spring configuration class uses @IntegrationComponentScan to scan for SI Java annotated components and defines two SI channels: inChannel and filteredChannel. Listing 8-62 shows the Java configuration of the inbound gateway interface.

In this case, the unidirectional inbound gateway is hooked to inChannel. To scan it as a Spring bean, we use @IntegrationComponentScan. Listing 8-63 shows the Java filter configuration.

With @MessageEndpoint, we can register it as a Spring bean. The @Filter annotation marks the method accepted as filtering logic. Notice that we don’t need to implement the MessageSelector interface. But we need to define the input and output channel. The filtering logic again refuses messages with a payload containing a corrupted string. Listing 8-64 shows the service activator configuration, which sits on the output side of the SI flow.

We’ve seen such configurations before. Listing 8-65 shows the main class of this example.

The only difference from the previous example is the lack of si-config.xml import, which is understandable as we use Java configuration. When we run it, output is similar to the output of the 0815-filter-xml-ref example in Listing 8-60.

SpEL-Based Filter Example Based on XML Configuration

Now it’s time for a filter example, which includes filtering logic inline into the SI flow. This can be done via SpEL , which can be embedded into various SI components including the filter. In fact, this example uses all the same classes from the XML filter example 0815-filter-xml-ref except SI flow configuration, which is shown in Listing 8-66.

The only difference is in the <int:filter> XML tag, which uses the expression attribute instead of ref. As you can see, we can define a simple Java-like scripting condition, which effectively has the same functionality as the simpleFilter bean from the 0815-filter-xml-ref example. Even the output and SI graph are similar to the previous example.

Router

A router is an SI component that can fork your message flow. In other words, it represents a decision component within SI. It applies routing logic and decides into which channel or various channels the message should be passed further. Notice that it should be OK to pass the same instance of the message to various channels, as the message is immutable. Routing logic can be defined in numerous ways, based on the following:

  • Generic routing
    • Custom Java logic: A Spring bean is used to route a message into a target channel or various channels. Can be configured via the XML element <int:router> or via the SI annotation @Router. This Java logic needs to return one of the types MessageChannel, List<MessageChannel>, String, Listing<String>, which specifies the channel instance(s) or channel name(s) to send messages to.
    • Payload type: Can be configured via <int:payload-type-router>.
    • Header value: Can be configured via <int:header-value-router>.
    • Exception type: Can handle routing of messages, which have Throwable as a payload type. It is, in fact, a special type of payload type handler. Can be configured via <int:exception-type-router>.
    • Recipient list router: Special type of router that sends messages to a static list of channels. It can be configured via <int:recipient-list-router>.
    • SpEL expression: Inline SpEL expression can be configured via an expression attribute of the <int:router> XML element.
  • Technology-specific routing
    • XPath expression: Can be configured via <int-xml:xpath-router>.

A router is a passive middle component and thus can participate in unidirectional or bidirectional flows.

Generic Router Example Based on XML Configuration

Listing 8-67 shows a WriterService implementation for the router example.

This service is plugged into the output of the SI flow. It uses our common WriteRepository to persist messages. But the difference from previous examples is the use of the two methods writeRoute1() and writeRoute2() for consuming messages. They are used to split the message flow into two routes. Listing 8-68 shows the SI flow configuration.

After the SI flow consumes the message via the inbound gateway, it is passed to the router component via inChannel. The router endpoint uses expression to inspect the payload of the message. If the payload is message1, it’s routed to route1Channel, and if the payload is message2, it is routed to route2Channel. These two channels are connected to service activators, which map them to the WriteService.writeRoute1() and WriteService.writeRoute2() Java methods. This is a simple fork of SI flow based on the payload value. Listing 8-69 shows the main class of this example.

After creating the Spring context with the included SI flow, we read the SiWrapperService bean. Then we send two messages with payloads message1 and message2 to the SI flow and log the returned value. So we are now dealing with bidirectional message flow with a temporary reply channel. Listing 8-70 covers the output when we run this main class.

The routing works as expected. We also get back return values, as if the SI flow is just a single service call. Figure 8-9 shows the SI graph for this example.

9781484207949_Fig08-09.jpg

Figure 8-9. Visualization of SI flow for generic XML router

Generic Router Example Based on Java Configuration

Now let’s configure the same logic with Java. Listing 8-71 shows the output service.

This service is the same as the one from the previous example, but in this case we use the @ServiceActivator annotation to mark the output points of the SI flow. Listing 8-72 shows the SI configuration class.

Alongside SI component scanning, we register all the channels needed to fork the flow. Listing 8-73 shows the input service interface.

The interface is marked as an inbound messaging gateway connected to inChannel. It returns a value, so again we have a request-response scenario of the flow. Listing 8-74 shows the routing logic.

The router class is annotated by the SI @MessageEndpoint annotation to be component scanned by the SI framework. We need to inject both channel instances we will route to. The routing method needs to be annotated by @Router and has to specify the inputChannel attribute so that we can consume messages from it. The routing logic itself is based on the presence of the substring 1 in the message. The channel instance we return will be used for passing the message further in the flow. Listing 8-75 shows the main class of this example.

The logic is similar to that in the 0818-router-xml-generic example in Listing 8-69, but in this case we don’t include an XML configuration because the flow is configured purely in Java. The second difference is in the type of bean we use as the service interface for the SI gateway, because this time it is an annotated version. The output is the same as for the 0818-router-xml-generic example from Listing 8-70, and as we don’t use an XML configuration, an integration graph can’t be visualized via STS.

Message Type Router Example

This example uses routing based on payload type. Listing 8-76 shows the output service.

Again, this is a similar output service with two methods as targets for routing. The difference is in the parameter of writeRouter2(), which consumes the int message. Notice that the integer parameter is concatenated to the string value message, so that it can be passed to WriterRepository. Listing 8-77 shows the service interface, which will be used as input to the SI flow.

Notice that the processMessage() method takes the Object type as a parameter. Listing 8-78 shows the SI flow’s XML configuration.

In this case, we use the <int:payload-type-router> XML element instead of <int:router>. To pick the target channel, we use the <int:mapping> element with the type attribute specified, which binds the payload Java type to a particular channel. Listing 8-79 shows the main class of this example.

First we send message1 to the message flow. The second message consists of value 2. The output of this example is shown in Listing 8-80.

As expected, the second message received by WriterService has the value 2.

Bridge

A bridge is simple routing endpoint that connects two message channels or channel adapters. It is therefore used as a middle component, which can obviously participate in unidirectional and bidirectional flows. It can be configured via the XML element <int:bridge> or via the @BridgeFrom or @BridgeTo annotations, where it is used to mark channel beans. Use includes passive as well as active mode. In active mode, it needs to use the <int:poller> or @Poller subelement.

We may, for example, use it when we need to consume messages in a passive component from a channel designed for an active consumer (PollableChannel is described later in the chapter). In this case, the channel and component are incompatible, and the active bridge component will help us bridge this gap. Another use case is to connect two SI flows or systems where data conversion is not needed.

Bridge Example with XML Configuration

Listing 8-81 shows the SI flow of the XML bridge example.

Messages will be passed to the SI infrastructure via the inbound message gateway, which uses our common SiWrapperServiceVoid as a service interface and sends messages to inChannel. In this case, inChannel is configured as a temporary store of 10 messages. This type of channel is explained in detail later, in the “QueueChannel” section.

This channel is connected to the bridge component, which is in this case active, and polls inChannel every 100 ms. The output side of the bridge is connected directly to the outbound channel adapter. The output-channel attribute of the bridge and id of the channel adapter match, which means that these components are interconnected. The output channel adapter is mapped to our common method WriteService.write(). Listing 8-82 shows the main class of this example.

This starts the Spring context and sends two messages to the SI flow via the SiWrapperServiceVoid bean. Listing 8-83 covers the output when we run this main class.

As expected, both messages are being persisted. Notice that this happens in the task scheduler thread, which is created by the <int:poller> subcomponent of the bridge. Figure 8-10 shows the SI graph for this SI flow.

9781484207949_Fig08-10.jpg

Figure 8-10. SI graph of XML bridge example

@BridgeFrom Example

Listing 8-84 shows the service interface sitting on the input side of the flow.

This bidirectional service interface is configured as an inbound gateway. Listing 8-85 shows the output bean of the SI flow.

WriterServiceAnnotated is configured as a service activator listening on bridgedChannel. Listing 8-86 shows the Java SI configuration in which the bridge is configured.

After a component scan for SI components, this defines two channels: inChannel and bridgedChannel. These channels are bridged via the value attribute of the @BridgeFrom annotation. The @Poller annotation turns this bridge to an active component. Listing 8-87 shows the main class of this example.

After running the Spring context, we send the message via the SiWrapperServiceAnnotated service interface and log the result returned by the SI flow. Listing 8-88 shows the output after running this example.

The message is persisted in the task scheduler thread, and the result is handled in the main thread.

@BridgeTo Example

This example is similar to the @BridgeFrom example. The only difference between these two examples is shown in Listing 8-89.

The @BridgeTo annotation is on inChannel in this case. The output of this example is also the same as for the @BridgeFrom example. As we can see, @BridgeTo annotates the source channel or component—the opposite of @BridgeFrom, which annotates the target channel or component.

Chain

A chain is a passive middle component, which also means it can participate in unidirectional and bidirectional message flows. As its name suggests, it is used for chaining message endpoints together without the need to specify channels to connect them. Messages are most commonly passed sequentially through chain subelements.

This component can wrap a set of message endpoints into one message endpoint. In fact, we can use a chain as a component in a higher-level chain and thus create a complex structure of SI flow chunks connected on various chain levels. Such nested chains need to be defined with use of the messaging gateway.

A chain can be configured via the XML element <int:chain>. Notice that there is no Java annotation equivalent for a chain definition. It requires only one mandatory attribute: input-channel. Components used in chains have various restrictions as compared to their use outside the chain. For example, components in a chain shouldn’t specify input-channel. In addition, the following are restrictions for the last element in the chain in relation to the output channel:

  • If the chain is connected to a downstream message endpoint, the last element in the chain must specify output-channel, or the message should have the replyChannel header defined.
  • If the last element is a channel adapter or service activator, it doesn’t need to specify the output channel.

Another restriction is that each endpoint in a chain must produce a message (can’t return null) and thus terminate the flow. Only the last element can discard the message. Elements in a chain can’t be active (can’t have poller).

Chain Example

This example borrows all the classes from the 0814-header-enricher-xml example:

Listing 8-90 shows the SI flow with the chain.

As compared to the 0814-header-enricher-xml example, we don’t need to define enrichedChannel, because the header enricher and outbound channel adapter are wrapped in a chain. The output of this example is also the same as the output of the header enricher example in Listing 8-56. The SI graph for this flow is shown in Figure 8-11.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.216.44.143