According to Enterprise Integration Patterns,1 a message channel is a virtual data pipe that connects a sender to one or more receivers. The message channel decouples the sender and the receivers so the sender does not necessary know who will receive the messages.
There are two major types of message channels: point-to-point channels and publisher-subscriber channels. By selecting the different type of message channel, the application can control the behavior of how the receivers get messages. The Spring Integration framework simplifies the development of message channels in an integration implementation.
Spring Integration supports the design patterns that are defined in Enterprise Integration Patterns. Since this is not a book on patterns, it will help to be already be familiar with basic integration design patterns. However, for general context, the basic message channel design patterns are reviewed following.
The message channel design pattern is very simple. The message contains a piece of information that needs to be passed between different components, which can either be in a process or across different applications in different servers. For example, a chain retail store inventory system can send a message with the string “Inventory is running low with Item #12345” to the corporate warehouse so it can send more inventory back to the store. Besides strings, a message can contain any data type that the applications can understand.
The message endpoints are the components that interact with the messages. The endpoint that sends messages to the message channel is called the sender or producer. The endpoint that receives message from the message channel is called the receiver or consumer. The sender puts the data into a message and the receiver takes the data out from the message. As a result, the sender and receiver need to understand the data that they are exchanging. Besides sender and receiver, the message endpoint can filter messages within a channel or route the messages to the other channel. Some message endpoints can even split a message into multiple messages and route them into different channels.
The message channel connects multiple endpoints together. Messages can be produced and sent from multiple senders and received by one or more receivers depending on the type of message channel. The channel ensures that the messages can be sent and received between endpoints safely. Since the application data is encapsulated within the message, the message channel does not need to understand the message payload. In other words, the message channel design pattern decouples the message sender and receiver.
In order to make the endpoints easily interact with the message channels, each message channel has a unique name, so each of the channels can be seen as a logical address. There are also different types of message channels that behave differently with regard to how to handle messages.
____________
1 Gregor Hohpe, op. cit.
The point-to-point channel (see Figure 6–1) guarantees that there is only one receiver that receives the same message from the sender at any given time. Spring Integration provides several types of point-to-point channel implementations: QueueChannel
, PriorityChannel
, RendezvousChannel
, DirectChannel
, ExecutorChannel
, and NullChannel
.
Figure 6–1. Point-to-point Channel
The publish-subscribe channel (see Figurre 6–2) allows one-to-many relationship between the producer and consumer, such that multiple consumers may receive the same message. The message is marked as “received” and removed from the channel when all the subscribed receivers have consumed the message. Spring Integration currently provides a publish-subscribe–style message channel implementation, which is PublishSubscribeChannel
.
Figure 6–2. Publish-subscribe Channel
Applications can use the message channel to transfer different types of data between the message sender and receiver. In order to process the message correctly, the receivers need to have knowledge about the message data type. For example, the sender can send object A and object B into the same message channel. The receiver needs to determine the object type in order to apply different business logic to handle the message. Usually, the determination is made by including a format indicator as part of the message or in the message header.
However, if there are two different channels, each can only have single data type, one for the object A and one for the object B, and the receiver will know data type of messages without using a format indicator. A message channel that only contains single type of object or message is called a data-typed channel. An example of a channel that can handle different types of data and a data-type channel is shown in Figure 6–3.
Figure 6–3. Message Channel with Multiple Data Types vs. Data-Typed Channels
Spring Integration message channels can be configured to constrain to handle messages of one or more specified payload types. For instance:
<channel id="messageChannel" />
This Spring Integration message channel can receive any kind of data type. However, the following message channel can only receive objects of class a.A
:
<channel id="queueChannel" datatype="a.A" />
We will discuss data-typed channels in more detail later in this chapter.
When the application receives a message from the channel, the application may decide not to process the incoming message because the data within the message may not pass validation or the application may not support the incoming message data type. The message will be routed into the invalid message channel (see Figure 6–4), allowing further handling by the other process/application. In Spring Integration, the validation would be done by a message filter, which will be discussed in more depth in Chapter 8.
Figure 6–4. Invalid Message Channel
When an application fails to deliver a message to the message channel after all the retry attempts, the message will be sent to the dead letter channel and will be handled further by the another process or application listening for messages on that channel (see Figure 6–5).
Figure 6–5. Dead Letter Channel
The channel adapter (see Figure 6–6) allows an application to connect to the messaging system. Most applications are not designed to communicate with a messaging system in the first place. By using a common interface or application programming interface (API), applications can be easily integrated with different messaging system implementations.
Figure 6–6. Channel Adapter
A messaging bridge is a relatively trivial endpoint that simply connects two message channels or channel adapters (see Figure 6–7). For example, the developer may want to connect a PollableChannel
to a SubscribableChannel
so that the subscribing endpoints don't have to worry about any polling configuration. Instead, the messaging bridge provides the polling support. Messaging bridges will be discussed in depth in Chapter 8.
According to Enterprise Integration Patterns2, a message bus is a combination of a canonical data model, a common command set, and a messaging infrastructure that allows different systems to communicate through a shared set of interfaces. By leveraging Spring Integration adapters and transformers, message channels can be connected and used to create a message bus allowing communication between different systems.
Messages are normally stored in memory and wait for delivery by the messaging system. If the message system crashes, all the messages will be lost. In order to guarantee the delivery, the messaging system can use a data store to persist the messages. By default, all the Spring Integration channels store messages in memory. In Spring Integration 2.0, message channels can now be backed by the JMS broker. In other words, messages are stored in an external JMS broker instead of in application memory. Message channels are just another strategy interface and can be tailored to your specific needs as required; implementations can be built that delegate to any data store mechanism conceivable using the MessageStore interface or - for more control – by implementing the MessageChannel interface itself.
It is very easy to apply the message channel design patterns using Spring Integration. All the Spring Integration channels implement the org.springframework.integration.MessageChannel
interface shown in Listing 6–1. This interface defines how a sender sends a message to the channel. This is required since different types of message channels can receive messages in different ways. Depending on the type of message channel implementations, the send operation can be blocked indefinitely or for a given timeout until the message is received.
Listing 6–1. MessageChannel.java
package org.springframework.integration;
public interface MessageChannel {
boolean send(Message<?> message);
boolean send(Message<?> message, long timeout);
}
____________
2Gregor Hohpe op. cit.
There are two different implementations for receiving messages from the channel in Spring Integration: org.springframework.integration.core.PollableChannel
and org.springframework.integration.core.SubscribableChannel
. Both of the implementations are unique subinterfaces of the MessageChannel
interface.
PollableChannel
(see Listing 6–2) allows the receiver to poll message from the message channel periodically. The receiver can choose to wait indefinitely or for a given timeout until a message arrives This gives for the receiver the flexibility to decide when to get the message from the channel.
Listing 6–2. PollableChannel.java
package org.springframework.integration.core;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
The alternative way to receive messages is by using the SubscribableChannel
(see Listing 6–3), which allows the sender to push the message to the subscribed receiver(s). When the sender sends a message, the subscribed receiver(s) will receive the message and process the message by the provided org.springframework.integration.core.MessageHandler
using the Gang-of-Four (GoF) Observer pattern.3 Once a message has been sent to the channel, all the subscribed message handlers will be involved.
Listing 6–3. SubscribableChannel.java
package org.springframework.integration.core;
import org.springframework.integration.MessageChannel;
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
The MessageHandler
interface (see Listing 6–4) contains only one method, which will handle the pushed message from the SubscribableChannel
channel. The interface also throws org.springframework.integration.MessageException
. Depending on the message channel implementation, each exception may be handled differently (one such possibility is failover). As a result, it is always a good idea to throw the appropriate exception. Table 6–1 gives some message handler¬–related exceptions.
____________
3 Erich Gamma, Richard Helm, Ralph Johnson, John M. Vlissides
Listing 6–4. MessageHandler.java
package org.springframework.integration.core;
import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessageException;
}
MessageRejectedException |
A message has been rejected by a selector. |
MessageHandlingException |
An error occurred during message handling. |
MessageDeliveryException |
An error occurred during message delivery (e.g., a network connectivity issue, a JMS broker error, a storage issue, etc.). |
MessageTimeoutException |
A timeout elapsed prior to successful message delivery. |
Once you have decided how the application will receive messages, you can choose the type of message channel implementation (point-to-point or publish-subscribe).
Spring Integration provides several different implementations of the point-to-point channel pattern. Let's look at the different point-to-point channel options.
The org.springframework.integration.channel.QueueChannel
class (see Figure 6–8) is the simplest implementation of the MessageChannel
interface. QueueChannel
has point-to-point semantics. In other words, even if the channel has multiple consumers, only one of them should receive any message sent to that channel. QueueChannel
also provides mechanisms to filter and purge messages that satisfy certain criteria. In addition, QueueChannel
stores all the messages in memory. By default, QueueChannel
can use all the available memory to store messages. To avoid running out of memory, it's always better to initiate the QueueChannel
instance with a channel capacity limiting the number of messages maintained in the queue.
Figure 6–8. Queue Channel Class Diagram
Assuming we are building a problem-reporting system, each problem can be described using a Ticket
. The problem will be reported by using a Problem reporter application. The Ticket
will be packaged into a message and submitted to a QueueChannel
. Next, the TicketReceiver
will receive the Ticket
message from the submitted order and the problem will be resolved. Let's look at the definitions for the various classes involved. First the class defining the Ticket message is shown in Listing 6–5.
Listing 6–5. Ticket.java
package com.apress.prospringintegration.channels.core;
import java.util.Calendar;
public class Ticket {
public enum Priority {
low,
medium,
high,
emergency
}
private long ticketId;
private Calendar issueDateTime;
private String description;
private Priority priority;
public Ticket() {
}
public long getTicketId() {
return ticketId;
}
public void setTicketId(long ticketId) {
this.ticketId = ticketId;
}
public Calendar getIssueDateTime() {
return issueDateTime;
}
public void setIssueDateTime(Calendar issueDateTime) {
this.issueDateTime = issueDateTime;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public Priority getPriority() {
return priority;
}
public void setPriority(Priority priority) {
this.priority = priority;
}
public String toString() {
return String.format("Ticket# %d: [%s] %s", ticketId, priority, description);
}
}
The ProblemReporter
class, shown in Listing 6–6, is a component which sends the Ticket
message to the message channel ticketChannel
when the openTicket
method is invoked.
Listing 6–6. ProblemReporter.java
package com.apress.prospringintegration.channels.directchannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class ProblemReporter {
private DirectChannel channel;
@Value("#{ticketChannel}")
public void setChannel(DirectChannel channel) {
this.channel = channel;
}
void openTicket(Ticket ticket) {
channel.send(MessageBuilder.withPayload( ticket).build() );
System.out.println("Ticket Sent - " + ticket.toString());
}
}
The TicketReceiver
class shown in Listing 6–7 receives the Ticket
message sent to the message channel ticketChannel
.
Listing 6–7. TicketReceiver.java
package com.apress.prospringintegration.channels.queuechannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.Message;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.stereotype.Component;
@Component
public class TicketReceiver implements Runnable {
final static int RECEIVE_TIMEOUT = 1000;
protected QueueChannel channel;
@Value("#{ticketChannel}")
public void setChannel(QueueChannel channel) {
this.channel = channel;
}
public void handleTicketMessage() {
Message<?> ticketMessage;
while (true) {
ticketMessage = channel.receive(RECEIVE_TIMEOUT);
if (ticketMessage != null) {
handleTicket( (Ticket) ticketMessage.getPayload() );
} else {
try {
/** Handle some other tasks **/
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
void handleTicket(Ticket ticket) {
System.out.println("Received ticket - " + ticket.toString());
}
@Override
public void run() {
handleTicketMessage();
}
}
The TicketGenerator
class shown in Listing 6–8 creates the Ticket
messages with different priority levels.
Listing 6–8. TicketGenerator.java
package com.apress.prospringintegration.channels.core;
import com.apress.prospringintegration.channels.core.Ticket.Priority;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.GregorianCalendar;
import java.util.List;
@Component
public class TicketGenerator {
private long nextTicketId;
public TicketGenerator() {
this.nextTicketId = 1000l;
}
public List<Ticket> createTickets() {
List<Ticket> tickets = new ArrayList<Ticket>();
tickets.add(createLowPriorityTicket());
tickets.add(createLowPriorityTicket());
tickets.add(createLowPriorityTicket());
tickets.add(createLowPriorityTicket());
tickets.add(createLowPriorityTicket());
tickets.add(createMediumPriorityTicket());
tickets.add(createMediumPriorityTicket());
tickets.add(createMediumPriorityTicket());
tickets.add(createMediumPriorityTicket());
tickets.add(createMediumPriorityTicket());
tickets.add(createHighPriorityTicket());
tickets.add(createHighPriorityTicket());
tickets.add(createHighPriorityTicket());
tickets.add(createHighPriorityTicket());
tickets.add(createHighPriorityTicket());
tickets.add(createEmergencyTicket());
tickets.add(createEmergencyTicket());
tickets.add(createEmergencyTicket());
tickets.add(createEmergencyTicket());
tickets.add(createEmergencyTicket());
return tickets;
}
Ticket createEmergencyTicket() {
return createTicket(Priority.emergency,
"Urgent problem. Fix immediately or revenue will be lost!");
}
Ticket createHighPriorityTicket() {
return createTicket(Priority.high,
"Serious issue. Fix immediately.");
}
Ticket createMediumPriorityTicket() {
return createTicket(Priority.medium,
"There is an issue; take a look whenever you have time.");
}
Ticket createLowPriorityTicket() {
return createTicket(Priority.low,
"Some minor problems have been found.");
}
Ticket createTicket(Priority priority, String description) {
Ticket ticket = new Ticket();
ticket.setTicketId(nextTicketId++);
ticket.setPriority(priority);
ticket.setIssueDateTime(GregorianCalendar.getInstance());
ticket.setDescription(description);
return ticket;
}
}
The main class TicketMain shown in Listing 6–9 creates the Spring context and initialize the various components described above.
Listing 6–9. TicketMain.java
package com.apress.prospringintegration.channels.queuechannel;
import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.TicketGenerator;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.List;
public class TicketMain {
public static void main(String[] args) {
String contextName = "queue-channel.xml";
ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext(contextName);
applicationContext.start();
ProblemReporter problemReporter =
applicationContext.getBean(ProblemReporter.class);
TicketReceiver ticketReceiver =
applicationContext.getBean("ticketReceiver", TicketReceiver.class);
TicketGenerator ticketGenerator =
applicationContext.getBean(TicketGenerator.class);
List<Ticket> tickets = ticketGenerator.createTickets();
for (Ticket ticket : tickets) {
problemReporter.openTicket(ticket);
}
Thread consumerThread = new Thread(ticketReceiver);
consumerThread.start();
}
}
The Spring configuration for the Ticket reporter example is shown in Listing 6–10.
Listing 6–10. queue-channel.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:component-scan
base-package="com.apress.prospringintegration.channels.queuechannel"/>
<context:component-scan
base-package="com.apress.prospringintegration.channels.core"/>
<int:channel id="ticketChannel"
datatype="com.apress.prospringintegration.channels.core.Ticket">
<int:queue capacity="50"/>
</int:channel>
</beans>
QueueChannel
is defined using the Spring Integration namespace in the Spring bean configuration file. Most of the beans used in the example were annotated with @Component
, which means we don't need to explicitly register them with the Spring application context because they will be picked up the context:component-scan
element. The Spring Integration namespace will be discussed later in this chapter. TicketMain
's main
method calls the TicketGenerator
class to create a list of Ticket
instances with different ticket priority values. The list of Ticket
objects will be sent to a QueueChannel
. TicketReceiver
runs in a separate worker thread and polls the same QueueChannel
to receive messages. Figure 6–9 shows the output of the preceding code:
QueueChannel
works exactly like a queue, which handles the message First In First Out (FIFO). In the preceding example, ProblemReporter
wraps the Ticket
into a Message
and puts into the QueueChannel
.
queueChannel.send(new GenericMessage<Ticket>(ticket));
org.springframework.integration.message.GenericMessage
implements the org.springframework.integration.Message
interface shown in Listing 6–11, which only contains two methods: getHeaders
and getPayload
.
Listing 6–11. Message.java
package org.springframework.integration;
public interface Message<T> {
MessageHeaders getHeaders();
T getPayload();
}
The Message
interface is simple: a message contains a message header and a payload. The message header is a map collection that consists of the key / value pairs shown in Table 6–2.
Header Name | Header Data Type |
ID |
java.util.UUID |
TIMESTAMP |
java.lang.Long |
CORRELATION_ID |
java.lang.Object |
REPLY_CHANNEL |
java.lang.Object |
ERROR_CHANNEL |
java.lang.Object |
SEQUENCE_NUMBER |
java.lang.Integer |
SEQUENCE_SIZE |
java.lang.Integer |
EXPIRATION_DATE |
java.lang.Long |
PRIORITY |
java.lang.Integer |
QueueChannel
's send
method does not block as long as the channel is not full. By default, QueueChannel
is unbounded. In order to avoid the application running out of memory, it is always a good idea to specify the capacity
value for a bounded QueueChannel
.
// channel.receive will be blocked until a message arrives
ticketMessage = channel.receive();
// channel.receive will be blocked for 1000ms
// or until a message arrives within the timeout period
ticketMessage = channel.receive(1000);
The TicketReceiver
will poll from QueueChannel
. If there is no message, TicketReceiver
will wait and block until a message can be received. However, in some situations, the application may want to handle something while waiting for the next message.
public void handleTicketMessage() {
Message<?> ticketMessage = null;
while (true) {
ticketMessage = channel.receive(RECEIVE_TIMEOUT);
if (ticketMessage != null) {
handleTicket( (Ticket) ticketMessage.getPayload());
} else {
try {
/** Handle some other tasks **/
Thread.sleep(1000);
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
In this case, TicketReceiver
will wait for about 1 second for the next message. If there is nothing in QueueChannel
, the block will be released so TicketReceiver
can handle something else and try to poll the message from the channel once again.
In reality, however, the emergency priority ticket should be handled first because the problem may be urgent. However, QueueChannel
is operating as a FIFO queue. As a result, QueueChannel
provides a feature that can drain specific messages out from the channel by using org.springframework.integration.core.MessageSelector
. The MessageSelector
interface looks like Listing 6–12.
Listing 6–12. MessageSelector.java
package org.springframework.integration.core;
import org.springframework.integration.Message;
public interface MessageSelector {
boolean accept(Message<?> message);
}
The MessageSelector
interface only contains the accept
method, which takes a Message
object. If the object can be accepted, the method will return a true
value. The ticket handling example is modified take the tickets with emergency priority first using the MessageSelector
as shown in Listing 6–13. This MessageSelector
returns true for non-emergency tickets.
Listing 6–13. EmergencyTicketSelector.java
package com.apress.prospringintegration.channels.queuechannel;
import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.Ticket.Priority;
import org.springframework.integration.Message;
import org.springframework.integration.core.MessageSelector;
import org.springframework.stereotype.Component;
@Component
public class EmergencyTicketSelector implements MessageSelector {
@Override
public boolean accept(Message<?> message) {
return ((Ticket) message.getPayload()).getPriority() != Priority.emergency;
}
}
The EmergencyTicketReceiver
class shown in Listing 6–14 first pull any emergency ticket by purging all emergency tickets using the MessageSelector
described above. The purge method removes and returns any message not accepted by the MessageSelector
. The emergency tickets are then processed first.
Listing 6–14. EmergencyTicketReceiver.java
package com.apress.prospringintegration.channels.queuechannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.Message;
import org.springframework.integration.core.MessageSelector;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.util.List;
@Component
public class EmergencyTicketReceiver extends TicketReceiver {
private MessageSelector emergencyTicketSelector;
@Autowired
public void setEmergencyTicketSelector(MessageSelector emergencyTicketSelector) {
this.emergencyTicketSelector = emergencyTicketSelector;
}
@Override
public void handleTicketMessage() {
Message<?> ticketMessage = null;
while (true) {
List<Message<?>> emergencyTicketMessages = channel.purge(emergencyTicketSelector);
handleEmergencyTickets(emergencyTicketMessages);
ticketMessage = channel.receive(RECEIVE_TIMEOUT);
if (ticketMessage != null) {
handleTicket((Ticket) ticketMessage.getPayload());
} else {
try {
/** Handle some other tasks **/
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
void handleEmergencyTickets(List<Message<?>> highPriorityTicketMessages) {
Assert.notNull(highPriorityTicketMessages);
for (Message<?> ticketMessage : highPriorityTicketMessages) {
handleTicket((Ticket) ticketMessage.getPayload());
}
}
}
The output looks like Figure 6–10. Pay attention to the tickets that have emergency priority. Instead of appearing at the very end of the list, all the tickets with emergency priority are received at the very top.
Figure 6–10. EmergencyTicketMain Output
In the preceding example, the purge
method is used to select and remove the messages that contain the emergency priority ticket. All the emergency tickets must be handled first, and then QueueChannel
can be polled as usual. This solution is quite complicated; however, Spring Integration provides a better solution to handle this use case.
org.springframework.integration.channel.PriorityChannel
is a subclass of QueueChannel
as shown in Figure 6–11. It works exactly like QueueChannel
, except that it allows the endpoint to receive messages in a specified priority based on a comparator similar to how a Java Collection
is sorted. By default, PriorityChannel
uses a default comparator based on the value in the message header PRIORITY
field. By using PriorityChannel
, the way the emergency tickets are handled in the previous example can be greatly simplified.
Figure 6–11. PriorityChannel Class Diagram
The preceding example is modified to leverage the PriorityChannel
. First the ticket receiver code is modified to use the PriorityChannel
as shown in Listing 6–15.
Listing 6–15. PriorityTicketReceiver.java
package com.apress.prospringintegration.channels.prioritychannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.Message;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.stereotype.Component;
@Component
public class PriorityTicketReceiver implements Runnable {
private final static int RECEIVE_TIMEOUT = 1000;
private PriorityChannel channel;
@Value("#{ticketChannel}")
public void setChannel(PriorityChannel channel) {
this.channel = channel;
}
public void handleTicketMessage() {
Message<?> ticketMessage = null;
while (true) {
ticketMessage = channel.receive(RECEIVE_TIMEOUT);
if (ticketMessage != null) {
handleTicket((Ticket) ticketMessage.getPayload());
} else {
try {
/** Handle some other tasks **/
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
void handleTicket(Ticket ticket) {
System.out.println("Received ticket - " + ticket.toString());
}
@Override
public void run() {
handleTicketMessage();
}
}
Next the ProblemReporter
class is modified to use the PriorityChannel
as shown in Listing 6–16.
Listing 6–16. ProblemReporter.java
package com.apress.prospringintegration.channels.prioritychannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.stereotype.Component;
@Component
public class ProblemReporter {
protected PriorityChannel channel;
@Value("#{ticketChannel}")
public void setChannel(PriorityChannel channel) {
this.channel = channel;
}
void openTicket(Ticket ticket) {
channel.send(new GenericMessage<Ticket>(ticket));
System.out.println("Ticket Sent - " + ticket.toString());
}
}
The PriorityTicketReceiver
code looks very similar to the original TicketReceiver
implementation using QueueChannel.
How can PriorityChannel
know which message has higher priority? By default, PriorityChannel
inspects the message org.springframework.integration.MessageHeaders
's PRIORITY
field. The larger the value, the higher priority the message has. In order to take advantage the default behavior of PriorityChannel
, a priority needs to be assigned to the message header when the message is created. This is done in the PriorityProblemReporter
class as shown in Listing 6–17.
Listing 6–17. PriorityProblemReporter.java
package com.apress.prospringintegration.channels.prioritychannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.integration.MessageHeaders;
import org.springframework.integration.message.GenericMessage;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class PriorityProblemReporter extends ProblemReporter {
void openTicket(Ticket ticket) {
Map<String, Object> messageHeader = new HashMap<String, Object>();
messageHeader.put(MessageHeaders.PRIORITY, ticket.getPriority().ordinal());
channel.send(new GenericMessage<Ticket>(ticket, messageHeader));
System.out.println("Ticket Sent - " + ticket.toString());
}
}
By default, the PriorityChannel
comparator compares the PRIORITY
field in the message header to determine the priority. The default comparator can be customized by replacing it with a custom comparator. The code in Listing 6–18 shows a custom comparator which compares the priority values stored in the Ticket
object instead of using the PRIORITY
field in the message header.
Listing 6–18. TicketMessagePriorityComparator.java
package com.apress.prospringintegration.channels.prioritychannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.integration.Message;
import org.springframework.stereotype.Component;
import java.util.Comparator;
@Component
public class TicketMessagePriorityComparator
implements Comparator<Message<Ticket>> {
@Override
public int compare(Message<Ticket> message1, Message<Ticket> message2) {
Integer priority1 = message1.getPayload().getPriority().ordinal();
Integer priority2 = message2.getPayload().getPriority().ordinal();
priority1 = priority1 != null ? priority1 : 0;
priority2 = priority2 != null ? priority2 : 0;
return priority2.compareTo(priority1);
}
}
Instead of comparing the message header's PRIORITY
field, TicketMessagePriorityComparator
compares the Ticket
's priority
value instead. Once the new comparator is implemented, it needs to be assigned to the PriorityChannel
instance. The following file shown in Listing 6–19 configures the requisite Spring Integration channel, and assigns it a reference to the TicketMessagePriorityComparator
, which is picked up and registered because it has the @Component
annotation on it.
Listing 6–19. priority-channel.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:component-scan
base-package="com.apress.prospringintegration.channels.prioritychannel"/>
<context:component-scan
base-package="com.apress.prospringintegration.channels.core"/>
<int:channel id="ticketChannel"
datatype="com.apress.prospringintegration.channels.core.Ticket">
<int:priority-queue capacity="50"
comparator="ticketMessagePriorityComparator"/>
</int:channel>
</beans>
Once again, PriorityChannel
is defined in the Spring bean configuration file and the comparator can be overridden in the configuration file.
<int:channel id="ticketChannel" datatype="com.apress.prospringintegration.channels.Ticket">
<int:priority-queue capacity="50" comparator="ticketMessagePriorityComparator" />
</int:channel>
The output looks like Figure 6–12, and it looks the same as the output for EmergencyTicketMain
earlier in the chapter, in the “QueueChannel” section.
Figure 6–12. EmergencyTicketMain Output
org.springframework.integration.channel.RendezvousChannel
is a synchronized version of QueueChannel
as shown in Figure 6–13. It uses a zero-capacity SynchronousQueue
instead of BlockingQueue
internally. The sender will be blocked until the receiver receives the message from the channel. In other words, the sender cannot send the second message until the receiver retrieves the message from the channel; or, the receiver will block until the sender sends a message to RendezvousChannel
. The behavior is similar to a semaphore running in multiple threads; as a result, it is very useful to use RendezvousChannel
to synchronize multiple threads when semaphores are not an option.
Figure 6–13. RendezvousChannel Class Diagram
The ticket handling example is modified to use a RendezvousChannel
. The sender cannot send the next message until the previous one is received. The ProblemReporter
class is modified to use the RendezvousChannel
as shown in Listing 6–20.
Listing 6–20. ProblemReporter.java
package com.apress.prospringintegration.channels.rendezvouschannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.channel.RendezvousChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class ProblemReporter {
private RendezvousChannel channel;
public ProblemReporter() {
}
@Value("#{ticketChannel}")
public void setChannel(RendezvousChannel channel) {
this.channel = channel;
}
void openTicket(Ticket ticket) {
channel.send(MessageBuilder.withPayload(ticket).build());
System.out.println("Ticket Sent - " + ticket.toString());
}
}
Next the TicketReceiver
class is modified to use the RendezvousChannel
as shown in Listing 6–21.
Listing 6–21. TicketReceiver.java
package com.apress.prospringintegration.channels.rendezvouschannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.Message;
import org.springframework.integration.channel.RendezvousChannel;
import org.springframework.stereotype.Component;
@Component
public class TicketReceiver implements Runnable {
private final static int RECEIVE_TIMEOUT = 1000;
private RendezvousChannel channel;
@Value("#{ticketChannel}")
public void setChannel(RendezvousChannel channel) {
this.channel = channel;
}
void handleTicket(Ticket ticket) {
System.out.println("Received ticket - " + ticket.toString());
}
@Override
public void run() {
Message<?> ticketMessage ;
while (true) {
ticketMessage = channel.receive(RECEIVE_TIMEOUT);
if (ticketMessage != null) {
handleTicket((Ticket) ticketMessage.getPayload());
} else {
try {
/** Handle some other tasks **/
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}
Finally the Main
class is modified, as shown in Listing 6–22, so that the message consumer thread starts up first. Otherwise, the thread will be blocked by RendezvousChannel
after sending the first message.
package com.apress.prospringintegration.channels.rendezvouschannel;
import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.TicketGenerator;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.List;
public class Main {
public static void main(String[] args) throws Throwable {
String contextName = "rendezvous-channel.xml";
ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext(contextName);
applicationContext.start();
ProblemReporter problemReporter = applicationContext.getBean(ProblemReporter.class);
TicketReceiver ticketReceiver = applicationContext.getBean(TicketReceiver.class);
TicketGenerator ticketGenerator = applicationContext.getBean(TicketGenerator.class);
// start *before* message publication because it'll block on put
Thread consumerThread = new Thread(ticketReceiver);
consumerThread.start();
List<Ticket> tickets = ticketGenerator.createTickets();
for (Ticket ticket : tickets) {
problemReporter.openTicket(ticket);
}
}
}
The output for the RenezvousChannel
example looks like Figure 6–14.
Figure 6–14.RendezvousChannel Main Output
org.springframework.integration.channel.DirectChannel
is a mixture of the point-to-point and publish-subscribe channels as shown in Figure 6–15. It uses the publish-subscribe pattern so the message will be pushed to the receiver, but only one of the receivers can receive the same message at any given time. As a result, DirectChannel
is actually a point-to-point channel. Since DirectChannel
does not add any overhead, it is the default channel type within Spring Integration.
Figure 6–15. DirectChannel Class Diagram
The ticket handling example is modified to use a DirectChannel
. The DirectChannel
is point-to-point where only one receiver can receive each message. A TicketMessageHandler
class is created to receive the incoming Ticket message as shown in Listing 6–23.
Listing 6–23. TicketMessageHandler.java
package com.apress.prospringintegration.channels.directchannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.integration.Message;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.MessagingException;
import org.springframework.integration.core.MessageHandler;
import org.springframework.stereotype.Component;
@Component
public class TicketMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message)
throws MessagingException {
Object payload = message.getPayload();
if (payload instanceof Ticket) {
handleTicket((Ticket) payload);
} else {
throw new MessageRejectedException(message, "Unknown data type has been received.");
}
}
void handleTicket(Ticket ticket) {
System.out.println("Received ticket - " + ticket.toString());
}
}
Next the ProblemReport
class is modified to use a DirectChannel
as shown in Listing 6–24.
Listing 6–24. ProblemReporter.java
package com.apress.prospringintegration.channels.directchannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class ProblemReporter {
private DirectChannel channel;
@Value("#{ticketChannel}")
public void setChannel(DirectChannel channel) {
this.channel = channel;
}
void openTicket(Ticket ticket) {
channel.send(MessageBuilder.withPayload( ticket).build() );
System.out.println("Ticket Sent - " + ticket.toString());
}
}
The Main class, as shown in Listing 6–25, is modified to subscribe the TicketMessageHandler
to the DirectChannelticketChannel
.
Listing 6–25. Main.java
package com.apress.prospringintegration.channels.directchannel;
import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.TicketGenerator;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import java.util.List;
public class Main {
public static void main(String[] args) throws Exception {
String contextName = "direct-channel.xml";
ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext(contextName);
applicationContext.start();
ProblemReporter problemReporter =
applicationContext.getBean(ProblemReporter.class);
TicketGenerator ticketGenerator =
applicationContext.getBean(TicketGenerator.class);
TicketMessageHandler ticketMessageHandler =
applicationContext.getBean(TicketMessageHandler.class);
DirectChannel channel =
applicationContext.getBean("ticketChannel", DirectChannel.class);
channel.subscribe(ticketMessageHandler);
List<Ticket> tickets = ticketGenerator.createTickets();
for (Ticket ticket : tickets) {
problemReporter.openTicket(ticket);
}
}
}
The Spring configuration is shown in Listing 6–26 again leveraging component scanning to simplify and reduce the amount of XML configuration.
Listing 6–26. direct-channel.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan
base-package="com.apress.prospringintegration.channels.directchannel"/>
<context:component-scan
base-package="com.apress.prospringintegration.channels.core"/>
<int:channel id="ticketChannel"/>
</beans>
Although DirectChannel
acts like a point-to-point channel, it allows multiple receivers to subscribe to the channel just like the other publish-subscribe channels. By default, DirectChannel
sends unique messages to each of the subscribed receivers in round-robin fashion, and it is the only strategy out of the box for now. Additional strategies will be added in future versions of Spring Integration. However, developers can create their own strategies by providing a custom load-balancing algorithm by implementing the org.springframework.integration.dispatcher.LoadBalancingStrategy
interface.
public interface LoadBalancingStrategy {
public Iterator<MessageHandler> getHandlerIterator(
Message<?> message, List<MessageHandler> handlers);
}
DirectChannel
will perform the handleMessage
method within the sender's thread before the send()
method returns. It is very useful for supporting transactions for both send and receive operations. For example, if the handleTicket
method writes to the database and JDBCException
has been thrown from the database-related operation, the handleMessage
method will cascade the error and throw a MessageException
. The DirectChannel
dispatchers will fail back all the subsequent handlers. By default, DirectChannel
has failover turned on, which means an exception will be thrown only if all the handlers have tried to handle the message.
org.springframework.integration.channel.ExecutorChannel
is a point-to-point message channel that's very similar to the DirectChannel
as shown in Figure 6–16. However, it allows the dispatching to happen in an instance of org.springframework.core.task.TaskExecutor
in a thread separate from the sender thread. In other words, the send method of ExecutorChannel
will not be blocked. As a result, ExecutorChannel
does not support transactions across the sender and receiver, as does DirectChannel
.
Figure 6–16. ExecutorChannel Class Diagram
The org.springframework.integration.channel.NullChannel
implementation as shown in Figure 6–17 is very interesting. NullChannel
is a dummy message channel that does nothing. It does not pass any messages from the sender to the receiver. NullChannel
's send
method always returns true
, while its receive()
method always returns a null
value. In other words, NullChannel
always returns success when attempting to send, while the channel always appears to contain nothing during receiving. Due to the special behavior of NullChannel
, it is mainly used for unit testing, integration testing, and debugging.
Figure 6–17. NullChannel Class Diagram
In Spring Integration 1.0, developers can use ThreadLocalChannel
to restrict the scope of the message channel within the same thread. In Spring Integration 2.0, ThreadLocalChannel
is replaced by a more general scope support. By simply defining the channel scope
attribute, no other thread will be able to access a message within the thread-scoped message channel.
The org.springframework.integration.channel.PublishSubscribeChannel
implementation is the basic publish-subscribe channel implementation of the pattern as shown in Figure 6–18. The message channel broadcasts any sent messages to all of the channel subscribers. In addition, the messages are pushed to the consumers instead of the consumers polling for the messages.
Figure 6–18. PublishSubscribeChannel Class Diagram
One of the important features of enterprise integration is the ability to capture information when the messages passing through the system. This is very useful when you want to intercept messages within the channel and inspect them before they reach their destinations. Spring Integration provides the opportunity to intercept the messages during the send and receive operations. This is where the ChannelInterceptor
interface comes into play as shown in Listing 6–27.
Listing 6–27. ChannelInterceptor.java
package org.springframework.integration.channel;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
}
ChannelInterceptor
allows the message to be intercepted before it's sent, after it's sent, before it's received, and after received. There are many methods to prevent a message from being processed in the channel downstream. For example, preSend
can return null
to prevent the message from being sent to the message channel. postReceived
can return null
to prevent the message from passing from the message channel into the consumer.
Since only the PollableChannel
interface has the receive operation, preReceive
and postReceive
will be invoked only for PollableChannel
implementations. The different available interceptor methods for the various message channels are shown in Table 6–3.
Table 6–3. Message Channels and Interceptors
Message Channels | preSend() | preReceive() | postSend() | postReceive() |
QueueChannel |
Yes | Yes | Yes | Yes |
PriorityChannel |
Yes | Yes | Yes | Yes |
RendezvousChannel |
Yes | Yes | Yes | Yes |
DirectChannel |
Yes | No | Yes | No |
ExecutorChannel |
Yes | No | Yes | No |
NullChannel |
Yes | Yes | Yes | Yes |
PublishSubscribeChannel |
Yes | No | Yes | No |
Once we have implemented a ChannelInterceptor
, we need to add it into the channel. We can add one or more interceptors into a channel. The interceptors will be invoked in order within the list.
messageChannel.addInterceptor(channelInterceptor);
messageChannel.setInterceptors(channelInterceptorList);
Spring Integration provides MessagingTemplate
as a very easy way to integrate a messaging system into applications. It supports many common message channel operations, such as send and receive. MessagingTemplate
also supports transactions by providing a PlatformTransactionManager
. Let's rewrite our problem-reporting system example from earlier in the chapter using MessagingTemplate
. The ProblemReporter
code is modified to use the MessagingTemplate
as shown in Listing 6–28. The MessagingTemplate
can also build the Spring Integration Message instance.
Listing 6–28. ProblemReporter.java
package com.apress.prospringintegration.channels.messagingtemplate;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class ProblemReporter {
private MessagingTemplate messagingTemplate;
@Autowired
public void setMessagingTemplate(MessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
public void openTicket(Ticket ticket) {
messagingTemplate.convertAndSend(ticket);
System.out.println("Ticket Sent - " + ticket.toString());
}
}
The MessagingTemplate can also be used to receive messages as shown in Listing 6–29.
Listing 6–29. TicketReceiver.java
package com.apress.prospringintegration.channels.messagingtemplate;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.integration.Message;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.stereotype.Component;
@Component
public class TicketReceiver implements Runnable {
private final static int RECEIVE_TIMEOUT = 1000;
private MessagingTemplate messagingTemplate;
@Autowired
public void setMessagingTemplate(MessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
this.messagingTemplate.setReceiveTimeout(RECEIVE_TIMEOUT);
}
public void handleTicketMessage() {
Message<?> ticketMessage;
while (true) {
ticketMessage = messagingTemplate.receive();
if (ticketMessage != null) {
handleTicket((Ticket) ticketMessage.getPayload());
} else {
/* Perform Some Other Tasks Here */
}
}
}
void handleTicket(Ticket ticket) {
System.out.println("Received ticket - " + ticket.toString());
}
@Override
public void run() {
handleTicketMessage();
}
}
The Main
class stays the same as shown in Listing 6–30.
Listing 6–30. Main.java
package com.apress.prospringintegration.channels.messagingtemplate;
import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.TicketGenerator;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.List;
public class Main {
public static void main(String[] args) {
String contextName = "messaging-template.xml";
ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext(contextName);
applicationContext.start();
ProblemReporter problemReporter =
applicationContext.getBean(ProblemReporter.class);
TicketReceiver ticketReceiver =
applicationContext.getBean(TicketReceiver.class);
TicketGenerator ticketGenerator =
applicationContext.getBean(TicketGenerator.class);
List<Ticket> tickets = ticketGenerator.createTickets();
for (Ticket ticket : tickets) {
problemReporter.openTicket(ticket);
}
Thread consumerThread = new Thread(ticketReceiver);
consumerThread.start();
}
}
The Spring configuration file when using the MessageTemplate
also stays the same as shown in Listing 6–31.
Listing 6–31. messaging-template.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:component-scan
base-package="com.apress.prospringintegration.channels.messagingtemplate"/>
<context:component-scan
base-package="com.apress.prospringintegration.channels.core"/>
<int:channel id="ticketChannel"
datatype="com.apress.prospringintegration.channels.core.Ticket">
<int:priority-queue capacity="50"/>
</int:channel>
</beans>
This is simialar to previous iterations of the file. It relies on the definitions established by component scannning the package and picking up the beans defined with @Component. Additionally, it relies on definitions established when the Java configuration class is scanned and bean definitions loaded:
package com.apress.prospringintegration.channels.messagingtemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.core.MessagingTemplate;
@Configuration
public class MessagingTemplateConfiguration {
@Value("#{ticketChannel}")
private MessageChannel messageChannel;
@Bean
public MessagingTemplate messagingTemplate() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(this.messageChannel);
messagingTemplate.setReceiveTimeout(1000);
return messagingTemplate;
}
}
In this section, we'll look at how to configure the different kinds of message channels using the Spring Integration XML namespace in the Spring bean configuration file. Make sure the Spring bean configuration file includes the Spring Integration namespace, like this:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
</beans>
In order to create a channel instance, you can use the channel
element in the Spring configuration file.
<int:channel id="queueChannel" />
The preceding example creates a QueueChannel
using all default configurations. In order words, the channel is unbound on capacity. This is very dangerous, since the JVM may run out of memory. It is always a good idea to limit the number of messages in the channel.
<int:channel id="queueChannel">
<int:queue capacity="50" />
</int:channel>
The preceding example limits the channel capacity to 50 messages. Once the channel reaches maximum capacity, all the send
operations will be blocked until the channel capacity frees up.
Spring Integration also supports the data type channel design pattern by setting the datatypes
element to a specific class. By default, all the Spring Integration message channels accept any type of message payload.
<int:channel id="queueChannel"
datatype="com.apress.prospringintegration.channels.Ticket">
<int:queue capacity="50" />
</int:channel>
Very similar to QueueChannel
, PriorityChannel
can also specify the queue capacity and data type. It is also a good idea to specify the queue capacity to avoid running out of memory.
<int:channel id="priorityChannel"
datatype="com.apress.prospringintegration.channels.Ticket">
<int:priority-queue capacity="50"/>
</int:channel>
By default, PriorityChannel
uses the PRIORITY
message header to determine the priority of the incoming messages.
package com.apress.prospringintegration.channels.prioritychannel;
import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.integration.Message;
import org.springframework.stereotype.Component;
import java.util.Comparator;
@SuppressWarnings("unused")
@Component
public class TicketMessagePriorityComparator
implements Comparator<Message<Ticket>> {
@Override
public int compare(Message<Ticket> message1, Message<Ticket> message2) {
Integer priority1 = message1.getPayload().getPriority().ordinal();
Integer priority2 = message2.getPayload().getPriority().ordinal();
priority1 = priority1 != null ? priority1 : 0;
priority2 = priority2 != null ? priority2 : 0;
return priority2.compareTo(priority1);
}
}
However, we can provide a custom comparator to change the behavior of PriorityChannel
:
<int:channel id="priorityChannel"
datatype="com.apress.prospringintegration.channels.Ticket">
<int:priority-queue capacity="50" comparator="ticketPriorityComparator" />
</int:channel>
RendezvousChannel
does not provide any additional configuration options other than setting the data type. Since it is a zero-capacity channel, the capacity cannot be changed. As a result, only one message can pass through the channel at any given time.
<int:channel id="rendezvousChannel"
datatype="com.apress.prospringintegration.channels.Ticket">
<rendezvous-queue/>
</int:channel>
DirectChannel
is the default type of Spring Integration channel. As a result, it requires nothing to specify the channel instance.
<int:channel id="directChannel"/>
Similar to the rest of the Spring Integration message channels, DirectChannel
allows users to specify the payload data type. It also allows developers to enable/disable failover and specify the load-balancing strategy. The default configuration for the DirectChannel
is failover enabled and round-robin load-balancing.
<int:channel id="directChannel"
datatype="com.apress.prospringintegration.channels.Ticket">
<int:dispatcher failover="true" load-balancer="none"/>
</int:channel>
In order to use ExecutorChannel
, we need to assign an executor to the channel dispatcher. In addition, ExecutorChannel
supports the failover and load-balancing options.
<int:channel id="executorChannel"
datatype="com.apress.prospringintegration.channels.Ticket">
<int:dispatcher task-executor="performTicketExecutor"/>
</int:channel>
The task-executor
attribute supports any Spring TaskExecutor
implementation.
Similar to ExecutorChannel
, the PublishSubscribeChannel
supports any TaskExecutor
implementation for the task-executor
attribute. By default, Spring Integration will not assign a correlation ID to the outgoing messages. This may be overridden by using the apply-sequence
attribute.
<int:publish-subscribe-channel id="publishSubscribeChannel"
task-executor="performTicketExecutor" apply-sequence="true"/>
In order to assign interceptors to a channel, we can use the interceptor's
subelement within the Spring configuration file.
<int:channel id="messageChannel">
<int:interceptors>
<ref bean="interceptor1"/>
<ref bean="interceptor2"/>
<ref bean="interceptor3"/>
</int:interceptors>
</int:channel>
Sometimes, we may want to assign the same interceptor to all the message channels. We can use the following configuration:
<int:channel-interceptor ref="interceptor1"/>
The preceding example will assign interceptor1
to all the message channels. However, sometimes we may just want the interceptor to be assigned to certain channels.
<int:channel-interceptor ref="interceptor1" pattern="ticketChannel, emergencyChannel"/>
We can use the wildcard in the pattern attribute and also specify the position to insert the interceptor into the list.
<int:channel-interceptor ref="interceptor1" pattern="ticketChannel, emergency*,
lowPriority*" order="2"/>
By default, Spring Integration message channels store messages in memory. This is because memory is fast and easy to implement. However, the number of messages that can be stored will be bounded by the available server memory. In addition, the channel may run out of memory very easily in a high-throughput integrated system. Even worse, all the messages will be gone if the system crashes. It is very unscalable and unreliable to use only memory to back up a message channel.
A pair of inbound and outbound channel adapters (endpoints and adapters will be discussed in more depth in Chapter 9) may be used to integrate the channels into a file system or JDBC data store. However, this approach is complicated because it still involves two channel adapters even though we are only dealing with a single channel.
Here is where Spring Integration 2.0 comes to the rescue; a message channel can be backed by JMS as an alternative to memory as shown in Figure 6–19. The JMS-backed message channel decouples the message producer and consumer by making messaging asynchronously. Instead of messages being stored in memory, they are stored in the JMS messaging system which can in turn store the messages however it see fits, be it to disk, or in memory, or any other configuration supported by the JMS broker. In addition, depending on the JMS configuration, the message endpoints can now scale out and run within different processes or servers.
Figure 6–19. JMS-backed Message Channel
The preceding example defines a point-to-point message channel. In order to use JMS to back this channel, we will use the jms
namespace and provide the JMS queue destination name. Make sure to include the Spring Integration JMS namespace. The resulting configuration is in many ways very comparable to the default, in-memory channel configuration, just from a different namespace.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<int-jms:channel id=”channel” queue-name=”jmsQueue” />
</beans>
By default, Spring Integration uses org.springframework.jms.support.destination.DynamicDestinationResolver
to resolve the JMS queue name into the actual JMS destination. It also looks for a ConnectionFactory
reference with the bean name connectionFactory
. In order to make the preceding example work, we need to provide the connection factory reference.
// org.springframework.jms.connection.CachingConnectionFactory
// is a convenient way to work with JMS connection factories
@Bean
public CachingConnectionFactory connectionFactory
(){
CachingConnectionFactory ccf = new CachingConnectionFactory();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("vm://localhost");
ccf.setTargetConnectionFactory(cf);
return ccf;
}
<int-jms:channel id="channel" queue-name="jmsQueue" />
We can provide a custom connection factory bean with a name other than connectionFactory
.
@Bean
public CachingConnectionFactory jmsConnectionFactory(){
CachingConnectionFactory ccf = new CachingConnectionFactory();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("vm://localhost");
ccf.setTargetConnectionFactory(cf);
return ccf ;
}
<int-jms:channel id="channel" queue-name="jmsQueue" connection-factory=
"jmsConnectionFactory" />
We can provide the queue reference instead of using the JMS queue destination name.
<int-jms:channel id="channel" queue="jmsQueue" />
In order to back a publish-subscribe message channel using JMS, we will provide the JMS topic name.
<int-jms:publish-subscribe-channel id="channel" topic-name="jmsTopic"
connection-factory="jmsConnectionFactory" />
We also can use a topic reference instead of the JMS topic destination name.
<int-jms:publish-subscribe-channel id="channel" topic="jmsTopic" />
Now we have JMS-backed message channels, which are scalable and reliable. Spring Integration 2.0 makes it very easy.
Spring Integration is inspired by Enterprise Integration Patterns.4 This chapter has specifically looked at how Spring Integration has implemented the message channel pattern. By providing various implementations of the point-to-point message channel and the publish-subscribe message channel, applications can communicate by sending messages with each other via the channels. The message operations can be shared in multiple threads or only exists in local thread.
By using the XML namespace in the Spring bean configuration file, developers can easily switch between different channel implementations. Sometimes it is necessary to invoke the messaging system within the application code. Spring Integration provides a messaging template, which supports a variety of operations for sending messages to the channels.
Since Spring Integration message channels store messages in memory, the channels are unbounded by default. To avoid running out of memory, it is always a good idea to bind the message channels by specifying the capacity. However, memory-based message channels will lose all the messages if the application crashes or the server shuts down. In order to make the message channel durable, Spring Integration 2.0 introduces JMS-backed message channels so the messages are stored within a JMS broker instead of in memory.
In the next couple chapters, we will discuss how to connect message channels with internal or external systems by using endpoints, adapters, transformers, and routers.
__________________
4 Gregor Hohpe, op. cit.
18.191.192.59