CHAPTER 6

images

Channels

According to Enterprise Integration Patterns,1 a message channel is a virtual data pipe that connects a sender to one or more receivers. The message channel decouples the sender and the receivers so the sender does not necessary know who will receive the messages.

There are two major types of message channels: point-to-point channels and publisher-subscriber channels. By selecting the different type of message channel, the application can control the behavior of how the receivers get messages. The Spring Integration framework simplifies the development of message channels in an integration implementation.

EAI Message Channel Patterns

Spring Integration supports the design patterns that are defined in Enterprise Integration Patterns. Since this is not a book on patterns, it will help to be already be familiar with basic integration design patterns. However, for general context, the basic message channel design patterns are reviewed following.

The message channel design pattern is very simple. The message contains a piece of information that needs to be passed between different components, which can either be in a process or across different applications in different servers. For example, a chain retail store inventory system can send a message with the string “Inventory is running low with Item #12345” to the corporate warehouse so it can send more inventory back to the store. Besides strings, a message can contain any data type that the applications can understand.

The message endpoints are the components that interact with the messages. The endpoint that sends messages to the message channel is called the sender or producer. The endpoint that receives message from the message channel is called the receiver or consumer. The sender puts the data into a message and the receiver takes the data out from the message. As a result, the sender and receiver need to understand the data that they are exchanging. Besides sender and receiver, the message endpoint can filter messages within a channel or route the messages to the other channel. Some message endpoints can even split a message into multiple messages and route them into different channels.

The message channel connects multiple endpoints together. Messages can be produced and sent from multiple senders and received by one or more receivers depending on the type of message channel. The channel ensures that the messages can be sent and received between endpoints safely. Since the application data is encapsulated within the message, the message channel does not need to understand the message payload. In other words, the message channel design pattern decouples the message sender and receiver.

In order to make the endpoints easily interact with the message channels, each message channel has a unique name, so each of the channels can be seen as a logical address. There are also different types of message channels that behave differently with regard to how to handle messages.

____________

1 Gregor Hohpe, op. cit.

Point-to-Point Channel

The point-to-point channel (see Figure 6–1) guarantees that there is only one receiver that receives the same message from the sender at any given time. Spring Integration provides several types of point-to-point channel implementations: QueueChannel, PriorityChannel, RendezvousChannel, DirectChannel, ExecutorChannel, and NullChannel.

images

Figure 6–1. Point-to-point Channel

Publish-Subscribe Channel

The publish-subscribe channel (see Figurre 6–2) allows one-to-many relationship between the producer and consumer, such that multiple consumers may receive the same message. The message is marked as “received” and removed from the channel when all the subscribed receivers have consumed the message. Spring Integration currently provides a publish-subscribe–style message channel implementation, which is PublishSubscribeChannel.

images

Figure 6–2. Publish-subscribe Channel

Data-Typed Channel

Applications can use the message channel to transfer different types of data between the message sender and receiver. In order to process the message correctly, the receivers need to have knowledge about the message data type. For example, the sender can send object A and object B into the same message channel. The receiver needs to determine the object type in order to apply different business logic to handle the message. Usually, the determination is made by including a format indicator as part of the message or in the message header.

However, if there are two different channels, each can only have single data type, one for the object A and one for the object B, and the receiver will know data type of messages without using a format indicator. A message channel that only contains single type of object or message is called a data-typed channel. An example of a channel that can handle different types of data and a data-type channel is shown in Figure 6–3.

images

Figure 6–3. Message Channel with Multiple Data Types vs. Data-Typed Channels

Spring Integration message channels can be configured to constrain to handle messages of one or more specified payload types. For instance:

<channel id="messageChannel" />

This Spring Integration message channel can receive any kind of data type. However, the following message channel can only receive objects of class a.A:

<channel id="queueChannel" datatype="a.A" />

We will discuss data-typed channels in more detail later in this chapter.

Invalid Message Channel

When the application receives a message from the channel, the application may decide not to process the incoming message because the data within the message may not pass validation or the application may not support the incoming message data type. The message will be routed into the invalid message channel (see Figure 6–4), allowing further handling by the other process/application. In Spring Integration, the validation would be done by a message filter, which will be discussed in more depth in Chapter 8.

images

Figure 6–4. Invalid Message Channel

Dead Letter Channel

When an application fails to deliver a message to the message channel after all the retry attempts, the message will be sent to the dead letter channel and will be handled further by the another process or application listening for messages on that channel (see Figure 6–5).

images

Figure 6–5. Dead Letter Channel

Channel Adapter

The channel adapter (see Figure 6–6) allows an application to connect to the messaging system. Most applications are not designed to communicate with a messaging system in the first place. By using a common interface or application programming interface (API), applications can be easily integrated with different messaging system implementations.

images

Figure 6–6. Channel Adapter

Messaging Bridge

A messaging bridge is a relatively trivial endpoint that simply connects two message channels or channel adapters (see Figure 6–7). For example, the developer may want to connect a PollableChannel to a SubscribableChannel so that the subscribing endpoints don't have to worry about any polling configuration. Instead, the messaging bridge provides the polling support. Messaging bridges will be discussed in depth in Chapter 8.

images

Figure 6–7. Messaging Bridge

Message Bus

According to Enterprise Integration Patterns2, a message bus is a combination of a canonical data model, a common command set, and a messaging infrastructure that allows different systems to communicate through a shared set of interfaces. By leveraging Spring Integration adapters and transformers, message channels can be connected and used to create a message bus allowing communication between different systems.

Guaranteed Delivery

Messages are normally stored in memory and wait for delivery by the messaging system. If the message system crashes, all the messages will be lost. In order to guarantee the delivery, the messaging system can use a data store to persist the messages. By default, all the Spring Integration channels store messages in memory. In Spring Integration 2.0, message channels can now be backed by the JMS broker. In other words, messages are stored in an external JMS broker instead of in application memory. Message channels are just another strategy interface and can be tailored to your specific needs as required; implementations can be built that delegate to any data store mechanism conceivable using the MessageStore interface or - for more control – by implementing the MessageChannel interface itself.

Choosing a Channel Instance

It is very easy to apply the message channel design patterns using Spring Integration. All the Spring Integration channels implement the org.springframework.integration.MessageChannel interface shown in Listing 6–1. This interface defines how a sender sends a message to the channel. This is required since different types of message channels can receive messages in different ways. Depending on the type of message channel implementations, the send operation can be blocked indefinitely or for a given timeout until the message is received.

Listing 6–1. MessageChannel.java

package org.springframework.integration;

public interface MessageChannel {

    boolean send(Message<?> message);

    boolean send(Message<?> message, long timeout);

}

____________

2Gregor Hohpe op. cit.

There are two different implementations for receiving messages from the channel in Spring Integration: org.springframework.integration.core.PollableChannel and org.springframework.integration.core.SubscribableChannel. Both of the implementations are unique subinterfaces of the MessageChannel interface.

PollableChannel (see Listing 6–2) allows the receiver to poll message from the message channel periodically. The receiver can choose to wait indefinitely or for a given timeout until a message arrives This gives for the receiver the flexibility to decide when to get the message from the channel.

Listing 6–2. PollableChannel.java

package org.springframework.integration.core;

import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;

public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

}

The alternative way to receive messages is by using the SubscribableChannel (see Listing 6–3), which allows the sender to push the message to the subscribed receiver(s). When the sender sends a message, the subscribed receiver(s) will receive the message and process the message by the provided org.springframework.integration.core.MessageHandler using the Gang-of-Four (GoF) Observer pattern.3 Once a message has been sent to the channel, all the subscribed message handlers will be involved.

Listing 6–3. SubscribableChannel.java

package org.springframework.integration.core;

import org.springframework.integration.MessageChannel;

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

The MessageHandler interface (see Listing 6–4) contains only one method, which will handle the pushed message from the SubscribableChannel channel. The interface also throws org.springframework.integration.MessageException. Depending on the message channel implementation, each exception may be handled differently (one such possibility is failover). As a result, it is always a good idea to throw the appropriate exception. Table 6–1 gives some message handler¬–related exceptions.

____________

3 Erich Gamma, Richard Helm, Ralph Johnson, John M. Vlissides

Listing 6–4. MessageHandler.java

package org.springframework.integration.core;

import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;

public interface MessageHandler {

void handleMessage(Message<?> message) throws MessageException;

}

Table 6–1. Message Exceptions

MessageRejectedException A message has been rejected by a selector.
MessageHandlingException An error occurred during message handling.
MessageDeliveryException An error occurred during message delivery (e.g., a network connectivity issue, a JMS broker error, a storage issue, etc.).
MessageTimeoutException A timeout elapsed prior to successful message delivery.

Once you have decided how the application will receive messages, you can choose the type of message channel implementation (point-to-point or publish-subscribe).

Point-to-Point Channel

Spring Integration provides several different implementations of the point-to-point channel pattern. Let's look at the different point-to-point channel options.

QueueChannel

The org.springframework.integration.channel.QueueChannel class (see Figure 6–8) is the simplest implementation of the MessageChannel interface. QueueChannel has point-to-point semantics. In other words, even if the channel has multiple consumers, only one of them should receive any message sent to that channel. QueueChannel also provides mechanisms to filter and purge messages that satisfy certain criteria. In addition, QueueChannel stores all the messages in memory. By default, QueueChannel can use all the available memory to store messages. To avoid running out of memory, it's always better to initiate the QueueChannel instance with a channel capacity limiting the number of messages maintained in the queue.

images

Figure 6–8. Queue Channel Class Diagram

Assuming we are building a problem-reporting system, each problem can be described using a Ticket. The problem will be reported by using a Problem reporter application. The Ticket will be packaged into a message and submitted to a QueueChannel. Next, the TicketReceiver will receive the Ticket message from the submitted order and the problem will be resolved. Let's look at the definitions for the various classes involved. First the class defining the Ticket message is shown in Listing 6–5.

Listing 6–5. Ticket.java

package com.apress.prospringintegration.channels.core;

import java.util.Calendar;

public class Ticket {

  public enum Priority {
    low,
    medium,
    high,
    emergency
  }

  private long ticketId;
  private Calendar issueDateTime;
  private String description;
  private Priority priority;

  public Ticket() {
  }

  public long getTicketId() {
    return ticketId;
  }

  public void setTicketId(long ticketId) {
    this.ticketId = ticketId;
  }

  public Calendar getIssueDateTime() {
    return issueDateTime;
  }

  public void setIssueDateTime(Calendar issueDateTime) {
    this.issueDateTime = issueDateTime;
  }

  public String getDescription() {
    return description;
  }

  public void setDescription(String description) {
    this.description = description;
  }

  public Priority getPriority() {
    return priority;
  }

  public void setPriority(Priority priority) {
    this.priority = priority;
  }

  public String toString() {
    return String.format("Ticket# %d: [%s] %s", ticketId, priority, description);
  }
}

The ProblemReporter class, shown in Listing 6–6, is a component which sends the Ticket message to the message channel ticketChannel when the openTicket method is invoked.

Listing 6–6. ProblemReporter.java

package com.apress.prospringintegration.channels.directchannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class ProblemReporter {

  private DirectChannel channel;

  @Value("#{ticketChannel}")
  public void setChannel(DirectChannel channel) {
    this.channel = channel;
  }

  void openTicket(Ticket ticket) {
    channel.send(MessageBuilder.withPayload( ticket).build() );
    System.out.println("Ticket Sent - " + ticket.toString());
  }
}

The TicketReceiver class shown in Listing 6–7 receives the Ticket message sent to the message channel ticketChannel.

Listing 6–7. TicketReceiver.java

package com.apress.prospringintegration.channels.queuechannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.Message;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.stereotype.Component;

@Component
public class TicketReceiver implements Runnable {

  final static int RECEIVE_TIMEOUT = 1000;

  protected QueueChannel channel;

  @Value("#{ticketChannel}")
  public void setChannel(QueueChannel channel) {
    this.channel = channel;
  }

  public void handleTicketMessage() {
    Message<?> ticketMessage;

    while (true) {
      ticketMessage = channel.receive(RECEIVE_TIMEOUT);
      if (ticketMessage != null) {
        handleTicket( (Ticket) ticketMessage.getPayload() );
      } else {
        try {
          /** Handle some other tasks **/
          Thread.sleep(1000);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
      }
    }
  }

  void handleTicket(Ticket ticket) {
    System.out.println("Received ticket - " + ticket.toString());
  }

  @Override
  public void run() {
    handleTicketMessage();
  }
}

The TicketGenerator class shown in Listing 6–8 creates the Ticket messages with different priority levels.

Listing 6–8. TicketGenerator.java

package com.apress.prospringintegration.channels.core;

import com.apress.prospringintegration.channels.core.Ticket.Priority;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.GregorianCalendar;
import java.util.List;

@Component
public class TicketGenerator {
    private long nextTicketId;

    public TicketGenerator() {
        this.nextTicketId = 1000l;
    }

    public List<Ticket> createTickets() {
        List<Ticket> tickets = new ArrayList<Ticket>();

        tickets.add(createLowPriorityTicket());
        tickets.add(createLowPriorityTicket());
        tickets.add(createLowPriorityTicket());
        tickets.add(createLowPriorityTicket());
        tickets.add(createLowPriorityTicket());
        tickets.add(createMediumPriorityTicket());
        tickets.add(createMediumPriorityTicket());
        tickets.add(createMediumPriorityTicket());
        tickets.add(createMediumPriorityTicket());
        tickets.add(createMediumPriorityTicket());
        tickets.add(createHighPriorityTicket());
        tickets.add(createHighPriorityTicket());
        tickets.add(createHighPriorityTicket());
        tickets.add(createHighPriorityTicket());
        tickets.add(createHighPriorityTicket());
        tickets.add(createEmergencyTicket());
        tickets.add(createEmergencyTicket());
        tickets.add(createEmergencyTicket());
        tickets.add(createEmergencyTicket());
        tickets.add(createEmergencyTicket());

        return tickets;
    }

    Ticket createEmergencyTicket() {
        return createTicket(Priority.emergency,
                "Urgent problem. Fix immediately or revenue will be lost!");
    }

    Ticket createHighPriorityTicket() {
        return createTicket(Priority.high,
                "Serious issue. Fix immediately.");
    }

    Ticket createMediumPriorityTicket() {
        return createTicket(Priority.medium,
                "There is an issue; take a look whenever you have time.");
    }

    Ticket createLowPriorityTicket() {
        return createTicket(Priority.low,
                "Some minor problems have been found.");
    }

    Ticket createTicket(Priority priority, String description) {
        Ticket ticket = new Ticket();
        ticket.setTicketId(nextTicketId++);
        ticket.setPriority(priority);
        ticket.setIssueDateTime(GregorianCalendar.getInstance());
        ticket.setDescription(description);

        return ticket;
    }
}

The main class TicketMain shown in Listing 6–9 creates the Spring context and initialize the various components described above.

Listing 6–9. TicketMain.java

package com.apress.prospringintegration.channels.queuechannel;

import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.TicketGenerator;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.List;

public class TicketMain {

    public static void main(String[] args) {
        String contextName = "queue-channel.xml";

        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext(contextName);
        applicationContext.start();

        ProblemReporter problemReporter =
                applicationContext.getBean(ProblemReporter.class);
        TicketReceiver ticketReceiver =
                applicationContext.getBean("ticketReceiver", TicketReceiver.class);
        TicketGenerator ticketGenerator =
                applicationContext.getBean(TicketGenerator.class);

        List<Ticket> tickets = ticketGenerator.createTickets();
        for (Ticket ticket : tickets) {
            problemReporter.openTicket(ticket);
        }

        Thread consumerThread = new Thread(ticketReceiver);
        consumerThread.start();
    }
}

The Spring configuration for the Ticket reporter example is shown in Listing 6–10.

Listing 6–10. queue-channel.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">

  <context:component-scan
      base-package="com.apress.prospringintegration.channels.queuechannel"/>
  <context:component-scan
      base-package="com.apress.prospringintegration.channels.core"/>

  <int:channel id="ticketChannel"
               datatype="com.apress.prospringintegration.channels.core.Ticket">
    <int:queue capacity="50"/>
  </int:channel>

</beans>

QueueChannel is defined using the Spring Integration namespace in the Spring bean configuration file. Most of the beans used in the example were annotated with @Component, which means we don't need to explicitly register them with the Spring application context because they will be picked up the context:component-scan element. The Spring Integration namespace will be discussed later in this chapter. TicketMain's main method calls the TicketGenerator class to create a list of Ticket instances with different ticket priority values. The list of Ticket objects will be sent to a QueueChannel. TicketReceiver runs in a separate worker thread and polls the same QueueChannel to receive messages. Figure 6–9 shows the output of the preceding code:

images

Figure 6–9. TicketMain Output

QueueChannel works exactly like a queue, which handles the message First In First Out (FIFO). In the preceding example, ProblemReporter wraps the Ticket into a Message and puts into the QueueChannel.

queueChannel.send(new GenericMessage<Ticket>(ticket));

org.springframework.integration.message.GenericMessage implements the org.springframework.integration.Message interface shown in Listing 6–11, which only contains two methods: getHeaders and getPayload.

Listing 6–11. Message.java

package org.springframework.integration;

public interface Message<T> {

        MessageHeaders getHeaders();

        T getPayload();

}

The Message interface is simple: a message contains a message header and a payload. The message header is a map collection that consists of the key / value pairs shown in Table 6–2.

Table 6–2. Message Header

Header Name Header Data Type
ID java.util.UUID
TIMESTAMP java.lang.Long
CORRELATION_ID java.lang.Object
REPLY_CHANNEL java.lang.Object
ERROR_CHANNEL java.lang.Object
SEQUENCE_NUMBER java.lang.Integer
SEQUENCE_SIZE java.lang.Integer
EXPIRATION_DATE java.lang.Long
PRIORITY java.lang.Integer

QueueChannel's send method does not block as long as the channel is not full. By default, QueueChannel is unbounded. In order to avoid the application running out of memory, it is always a good idea to specify the capacity value for a bounded QueueChannel.

// channel.receive will be blocked until a message arrives
ticketMessage = channel.receive();

// channel.receive will be blocked for 1000ms
// or until a message arrives within the timeout period
ticketMessage = channel.receive(1000);

The TicketReceiver will poll from QueueChannel. If there is no message, TicketReceiver will wait and block until a message can be received. However, in some situations, the application may want to handle something while waiting for the next message.

public void handleTicketMessage() {
    Message<?> ticketMessage = null;

    while (true) {
        ticketMessage = channel.receive(RECEIVE_TIMEOUT);
        if (ticketMessage != null) {
            handleTicket( (Ticket) ticketMessage.getPayload());
        } else {
            try {
                /** Handle some other tasks **/
                Thread.sleep(1000);
            }
            catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }
}

In this case, TicketReceiver will wait for about 1 second for the next message. If there is nothing in QueueChannel, the block will be released so TicketReceiver can handle something else and try to poll the message from the channel once again.

In reality, however, the emergency priority ticket should be handled first because the problem may be urgent. However, QueueChannel is operating as a FIFO queue. As a result, QueueChannel provides a feature that can drain specific messages out from the channel by using org.springframework.integration.core.MessageSelector. The MessageSelector interface looks like Listing 6–12.

Listing 6–12. MessageSelector.java

package org.springframework.integration.core;

import org.springframework.integration.Message;

public interface MessageSelector {

    boolean accept(Message<?> message);

}

The MessageSelector interface only contains the accept method, which takes a Message object. If the object can be accepted, the method will return a true value. The ticket handling example is modified take the tickets with emergency priority first using the MessageSelector as shown in Listing 6–13. This MessageSelector returns true for non-emergency tickets.

Listing 6–13. EmergencyTicketSelector.java

package com.apress.prospringintegration.channels.queuechannel;

import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.Ticket.Priority;
import org.springframework.integration.Message;
import org.springframework.integration.core.MessageSelector;
import org.springframework.stereotype.Component;

@Component
public class EmergencyTicketSelector implements MessageSelector {
  @Override
  public boolean accept(Message<?> message) {
    return ((Ticket) message.getPayload()).getPriority() != Priority.emergency;
  }
}

The EmergencyTicketReceiver class shown in Listing 6–14 first pull any emergency ticket by purging all emergency tickets using the MessageSelector described above. The purge method removes and returns any message not accepted by the MessageSelector. The emergency tickets are then processed first.

Listing 6–14. EmergencyTicketReceiver.java

package com.apress.prospringintegration.channels.queuechannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.Message;
import org.springframework.integration.core.MessageSelector;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import java.util.List;

@Component
public class EmergencyTicketReceiver extends TicketReceiver {

  private MessageSelector emergencyTicketSelector;

  @Autowired
  public void setEmergencyTicketSelector(MessageSelector emergencyTicketSelector) {
    this.emergencyTicketSelector = emergencyTicketSelector;
  }

  @Override
  public void handleTicketMessage() {
    Message<?> ticketMessage = null;

    while (true) {
      List<Message<?>> emergencyTicketMessages = channel.purge(emergencyTicketSelector);
      handleEmergencyTickets(emergencyTicketMessages);

      ticketMessage = channel.receive(RECEIVE_TIMEOUT);
      if (ticketMessage != null) {
        handleTicket((Ticket) ticketMessage.getPayload());
      } else {
        try {
          /** Handle some other tasks **/
          Thread.sleep(1000);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
      }
    }
  }

  void handleEmergencyTickets(List<Message<?>> highPriorityTicketMessages) {
    Assert.notNull(highPriorityTicketMessages);
    for (Message<?> ticketMessage : highPriorityTicketMessages) {
      handleTicket((Ticket) ticketMessage.getPayload());
    }
  }
}

The output looks like Figure 6–10. Pay attention to the tickets that have emergency priority. Instead of appearing at the very end of the list, all the tickets with emergency priority are received at the very top.

images

Figure 6–10. EmergencyTicketMain Output

In the preceding example, the purge method is used to select and remove the messages that contain the emergency priority ticket. All the emergency tickets must be handled first, and then QueueChannel can be polled as usual. This solution is quite complicated; however, Spring Integration provides a better solution to handle this use case.

PriorityChannel

org.springframework.integration.channel.PriorityChannel is a subclass of QueueChannel as shown in Figure 6–11. It works exactly like QueueChannel, except that it allows the endpoint to receive messages in a specified priority based on a comparator similar to how a Java Collection is sorted. By default, PriorityChannel uses a default comparator based on the value in the message header PRIORITY field. By using PriorityChannel, the way the emergency tickets are handled in the previous example can be greatly simplified.

images

Figure 6–11. PriorityChannel Class Diagram

The preceding example is modified to leverage the PriorityChannel. First the ticket receiver code is modified to use the PriorityChannel as shown in Listing 6–15.

Listing 6–15. PriorityTicketReceiver.java

package com.apress.prospringintegration.channels.prioritychannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.Message;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.stereotype.Component;

@Component
public class PriorityTicketReceiver implements Runnable {
  private final static int RECEIVE_TIMEOUT = 1000;

  private PriorityChannel channel;

  @Value("#{ticketChannel}")
  public void setChannel(PriorityChannel channel) {
    this.channel = channel;
  }

  public void handleTicketMessage() {
    Message<?> ticketMessage = null;

    while (true) {
      ticketMessage = channel.receive(RECEIVE_TIMEOUT);
      if (ticketMessage != null) {
        handleTicket((Ticket) ticketMessage.getPayload());
      } else {
        try {
          /** Handle some other tasks **/

          Thread.sleep(1000);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
      }
    }
  }

  void handleTicket(Ticket ticket) {
    System.out.println("Received ticket - " + ticket.toString());
  }

  @Override
  public void run() {
    handleTicketMessage();
  }
}

Next the ProblemReporter class is modified to use the PriorityChannel as shown in Listing 6–16.

Listing 6–16. ProblemReporter.java

package com.apress.prospringintegration.channels.prioritychannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.stereotype.Component;

@Component
public class ProblemReporter {
  protected PriorityChannel channel;

  @Value("#{ticketChannel}")
  public void setChannel(PriorityChannel channel) {
    this.channel = channel;
  }

  void openTicket(Ticket ticket) {
    channel.send(new GenericMessage<Ticket>(ticket));
    System.out.println("Ticket Sent - " + ticket.toString());
  }
}

The PriorityTicketReceiver code looks very similar to the original TicketReceiver implementation using QueueChannel. How can PriorityChannel know which message has higher priority? By default, PriorityChannel inspects the message org.springframework.integration.MessageHeaders's PRIORITY field. The larger the value, the higher priority the message has. In order to take advantage the default behavior of PriorityChannel, a priority needs to be assigned to the message header when the message is created. This is done in the PriorityProblemReporter class as shown in Listing 6–17.

Listing 6–17. PriorityProblemReporter.java

package com.apress.prospringintegration.channels.prioritychannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.integration.MessageHeaders;
import org.springframework.integration.message.GenericMessage;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
public class PriorityProblemReporter extends ProblemReporter {

  void openTicket(Ticket ticket) {
    Map<String, Object> messageHeader = new HashMap<String, Object>();
    messageHeader.put(MessageHeaders.PRIORITY, ticket.getPriority().ordinal());

    channel.send(new GenericMessage<Ticket>(ticket, messageHeader));
    System.out.println("Ticket Sent - " + ticket.toString());
  }
}

By default, the PriorityChannel comparator compares the PRIORITY field in the message header to determine the priority. The default comparator can be customized by replacing it with a custom comparator. The code in Listing 6–18 shows a custom comparator which compares the priority values stored in the Ticket object instead of using the PRIORITY field in the message header.

Listing 6–18. TicketMessagePriorityComparator.java

package com.apress.prospringintegration.channels.prioritychannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.integration.Message;
import org.springframework.stereotype.Component;

import java.util.Comparator;

@Component
public class TicketMessagePriorityComparator
    implements Comparator<Message<Ticket>> {

  @Override
  public int compare(Message<Ticket> message1, Message<Ticket> message2) {
    Integer priority1 = message1.getPayload().getPriority().ordinal();
    Integer priority2 = message2.getPayload().getPriority().ordinal();

    priority1 = priority1 != null ? priority1 : 0;
    priority2 = priority2 != null ? priority2 : 0;

    return priority2.compareTo(priority1);
  }
}

Instead of comparing the message header's PRIORITY field, TicketMessagePriorityComparator compares the Ticket's priority value instead. Once the new comparator is implemented, it needs to be assigned to the PriorityChannel instance. The following file shown in Listing 6–19 configures the requisite Spring Integration channel, and assigns it a reference to the TicketMessagePriorityComparator, which is picked up and registered because it has the @Component annotation on it.

Listing 6–19. priority-channel.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">

  <context:component-scan
      base-package="com.apress.prospringintegration.channels.prioritychannel"/>
  <context:component-scan
      base-package="com.apress.prospringintegration.channels.core"/>

  <int:channel id="ticketChannel"
               datatype="com.apress.prospringintegration.channels.core.Ticket">
    <int:priority-queue capacity="50"
                        comparator="ticketMessagePriorityComparator"/>
  </int:channel>


</beans>

Once again, PriorityChannel is defined in the Spring bean configuration file and the comparator can be overridden in the configuration file.

<int:channel id="ticketChannel" datatype="com.apress.prospringintegration.channels.Ticket">
    <int:priority-queue capacity="50" comparator="ticketMessagePriorityComparator" />
</int:channel>

The output looks like Figure 6–12, and it looks the same as the output for EmergencyTicketMain earlier in the chapter, in the “QueueChannel” section.

images

Figure 6–12. EmergencyTicketMain Output

RendezvousChannel

org.springframework.integration.channel.RendezvousChannel is a synchronized version of QueueChannel as shown in Figure 6–13. It uses a zero-capacity SynchronousQueue instead of BlockingQueue internally. The sender will be blocked until the receiver receives the message from the channel. In other words, the sender cannot send the second message until the receiver retrieves the message from the channel; or, the receiver will block until the sender sends a message to RendezvousChannel. The behavior is similar to a semaphore running in multiple threads; as a result, it is very useful to use RendezvousChannel to synchronize multiple threads when semaphores are not an option.

images

Figure 6–13. RendezvousChannel Class Diagram

The ticket handling example is modified to use a RendezvousChannel. The sender cannot send the next message until the previous one is received. The ProblemReporter class is modified to use the RendezvousChannel as shown in Listing 6–20.

Listing 6–20. ProblemReporter.java

package com.apress.prospringintegration.channels.rendezvouschannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.channel.RendezvousChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class ProblemReporter {
  private RendezvousChannel channel;

  public ProblemReporter() {
  }

  @Value("#{ticketChannel}")
  public void setChannel(RendezvousChannel channel) {
    this.channel = channel;
  }

  void openTicket(Ticket ticket) {
    channel.send(MessageBuilder.withPayload(ticket).build());
    System.out.println("Ticket Sent - " + ticket.toString());
  }
}

Next the TicketReceiver class is modified to use the RendezvousChannel as shown in Listing 6–21.

Listing 6–21. TicketReceiver.java

package com.apress.prospringintegration.channels.rendezvouschannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.Message;
import org.springframework.integration.channel.RendezvousChannel;
import org.springframework.stereotype.Component;

@Component
public class TicketReceiver implements Runnable {

  private final static int RECEIVE_TIMEOUT = 1000;

  private RendezvousChannel channel;

  @Value("#{ticketChannel}")
  public void setChannel(RendezvousChannel channel) {
    this.channel = channel;
  }

  void handleTicket(Ticket ticket) {
    System.out.println("Received ticket - " + ticket.toString());
  }

  @Override
  public void run() {
    Message<?> ticketMessage ;

    while (true) {
      ticketMessage = channel.receive(RECEIVE_TIMEOUT);
      if (ticketMessage != null) {
        handleTicket((Ticket) ticketMessage.getPayload());
      } else {
        try {
          /** Handle some other tasks **/

          Thread.sleep(1000);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
      }
    }
  }
}

Finally the Main class is modified, as shown in Listing 6–22, so that the message consumer thread starts up first. Otherwise, the thread will be blocked by RendezvousChannel after sending the first message.

Listing 6–22. Main.java

package com.apress.prospringintegration.channels.rendezvouschannel;

import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.TicketGenerator;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.List;

public class Main {
  public static void main(String[] args) throws Throwable {
    String contextName = "rendezvous-channel.xml";

    ClassPathXmlApplicationContext applicationContext =
         new ClassPathXmlApplicationContext(contextName);
    applicationContext.start();

    ProblemReporter problemReporter = applicationContext.getBean(ProblemReporter.class);
    TicketReceiver ticketReceiver = applicationContext.getBean(TicketReceiver.class);
    TicketGenerator ticketGenerator = applicationContext.getBean(TicketGenerator.class);

    // start *before* message publication because it'll block on put
    Thread consumerThread = new Thread(ticketReceiver);
    consumerThread.start();

    List<Ticket> tickets = ticketGenerator.createTickets();
    for (Ticket ticket : tickets) {
      problemReporter.openTicket(ticket);
    }
  }
}

The output for the RenezvousChannel example looks like Figure 6–14.

images

Figure 6–14.RendezvousChannel Main Output

DirectChannel

org.springframework.integration.channel.DirectChannel is a mixture of the point-to-point and publish-subscribe channels as shown in Figure 6–15. It uses the publish-subscribe pattern so the message will be pushed to the receiver, but only one of the receivers can receive the same message at any given time. As a result, DirectChannel is actually a point-to-point channel. Since DirectChannel does not add any overhead, it is the default channel type within Spring Integration.

images

Figure 6–15. DirectChannel Class Diagram

The ticket handling example is modified to use a DirectChannel. The DirectChannel is point-to-point where only one receiver can receive each message. A TicketMessageHandler class is created to receive the incoming Ticket message as shown in Listing 6–23.

Listing 6–23. TicketMessageHandler.java

package com.apress.prospringintegration.channels.directchannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.integration.Message;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.MessagingException;
import org.springframework.integration.core.MessageHandler;
import org.springframework.stereotype.Component;

@Component
public class TicketMessageHandler implements MessageHandler {

  @Override
  public void handleMessage(Message<?> message)
      throws MessagingException {
    Object payload = message.getPayload();

    if (payload instanceof Ticket) {
      handleTicket((Ticket) payload);
    } else {
      throw new MessageRejectedException(message, "Unknown data type has been received.");
    }
  }

  void handleTicket(Ticket ticket) {
    System.out.println("Received ticket - " + ticket.toString());
  }
}

Next the ProblemReport class is modified to use a DirectChannel as shown in Listing 6–24.

Listing 6–24. ProblemReporter.java

package com.apress.prospringintegration.channels.directchannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class ProblemReporter {

  private DirectChannel channel;

  @Value("#{ticketChannel}")
  public void setChannel(DirectChannel channel) {
    this.channel = channel;
  }

  void openTicket(Ticket ticket) {
    channel.send(MessageBuilder.withPayload( ticket).build() );
    System.out.println("Ticket Sent - " + ticket.toString());
  }
}

The Main class, as shown in Listing 6–25, is modified to subscribe the TicketMessageHandler to the DirectChannelticketChannel.

Listing 6–25. Main.java

package com.apress.prospringintegration.channels.directchannel;

import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.TicketGenerator;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.DirectChannel;

import java.util.List;

public class Main {

    public static void main(String[] args) throws Exception {

        String contextName = "direct-channel.xml";

        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext(contextName);
        applicationContext.start();

        ProblemReporter problemReporter =
                applicationContext.getBean(ProblemReporter.class);
        TicketGenerator ticketGenerator =
                applicationContext.getBean(TicketGenerator.class);
        TicketMessageHandler ticketMessageHandler =
                applicationContext.getBean(TicketMessageHandler.class);

        DirectChannel channel =
                applicationContext.getBean("ticketChannel", DirectChannel.class);
        channel.subscribe(ticketMessageHandler);

        List<Ticket> tickets = ticketGenerator.createTickets();
        for (Ticket ticket : tickets) {
            problemReporter.openTicket(ticket);
        }
    }
}

The Spring configuration is shown in Listing 6–26 again leveraging component scanning to simplify and reduce the amount of XML configuration.

Listing 6–26. direct-channel.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">

  <context:component-scan
      base-package="com.apress.prospringintegration.channels.directchannel"/>
  <context:component-scan
      base-package="com.apress.prospringintegration.channels.core"/>

  <int:channel id="ticketChannel"/>


</beans>

Although DirectChannel acts like a point-to-point channel, it allows multiple receivers to subscribe to the channel just like the other publish-subscribe channels. By default, DirectChannel sends unique messages to each of the subscribed receivers in round-robin fashion, and it is the only strategy out of the box for now. Additional strategies will be added in future versions of Spring Integration. However, developers can create their own strategies by providing a custom load-balancing algorithm by implementing the org.springframework.integration.dispatcher.LoadBalancingStrategy interface.

public interface LoadBalancingStrategy {

    public Iterator<MessageHandler> getHandlerIterator(
          Message<?> message, List<MessageHandler> handlers);

}

DirectChannel will perform the handleMessage method within the sender's thread before the send() method returns. It is very useful for supporting transactions for both send and receive operations. For example, if the handleTicket method writes to the database and JDBCException has been thrown from the database-related operation, the handleMessage method will cascade the error and throw a MessageException. The DirectChannel dispatchers will fail back all the subsequent handlers. By default, DirectChannel has failover turned on, which means an exception will be thrown only if all the handlers have tried to handle the message.

ExecutorChannel

org.springframework.integration.channel.ExecutorChannel is a point-to-point message channel that's very similar to the DirectChannel as shown in Figure 6–16. However, it allows the dispatching to happen in an instance of org.springframework.core.task.TaskExecutor in a thread separate from the sender thread. In other words, the send method of ExecutorChannel will not be blocked. As a result, ExecutorChannel does not support transactions across the sender and receiver, as does DirectChannel.

images

Figure 6–16. ExecutorChannel Class Diagram

NullChannel

The org.springframework.integration.channel.NullChannel implementation as shown in Figure 6–17 is very interesting. NullChannel is a dummy message channel that does nothing. It does not pass any messages from the sender to the receiver. NullChannel's send method always returns true, while its receive() method always returns a null value. In other words, NullChannel always returns success when attempting to send, while the channel always appears to contain nothing during receiving. Due to the special behavior of NullChannel, it is mainly used for unit testing, integration testing, and debugging.

images

Figure 6–17. NullChannel Class Diagram

Scoped Channel

In Spring Integration 1.0, developers can use ThreadLocalChannel to restrict the scope of the message channel within the same thread. In Spring Integration 2.0, ThreadLocalChannel is replaced by a more general scope support. By simply defining the channel scope attribute, no other thread will be able to access a message within the thread-scoped message channel.

Publish-Subscribe Channel

The org.springframework.integration.channel.PublishSubscribeChannel implementation is the basic publish-subscribe channel implementation of the pattern as shown in Figure 6–18. The message channel broadcasts any sent messages to all of the channel subscribers. In addition, the messages are pushed to the consumers instead of the consumers polling for the messages.

images

Figure 6–18. PublishSubscribeChannel Class Diagram

Channel Interceptors

One of the important features of enterprise integration is the ability to capture information when the messages passing through the system. This is very useful when you want to intercept messages within the channel and inspect them before they reach their destinations. Spring Integration provides the opportunity to intercept the messages during the send and receive operations. This is where the ChannelInterceptor interface comes into play as shown in Listing 6–27.

Listing 6–27. ChannelInterceptor.java

package org.springframework.integration.channel;

import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;

public interface ChannelInterceptor {

    Message<?> preSend(Message<?> message, MessageChannel channel);

    void postSend(Message<?> message, MessageChannel channel, boolean sent);

    boolean preReceive(MessageChannel channel);

    Message<?> postReceive(Message<?> message, MessageChannel channel);

}

ChannelInterceptor allows the message to be intercepted before it's sent, after it's sent, before it's received, and after received. There are many methods to prevent a message from being processed in the channel downstream. For example, preSend can return null to prevent the message from being sent to the message channel. postReceived can return null to prevent the message from passing from the message channel into the consumer.

Since only the PollableChannel interface has the receive operation, preReceive and postReceive will be invoked only for PollableChannel implementations. The different available interceptor methods for the various message channels are shown in Table 6–3.

Table 6–3. Message Channels and Interceptors

Message Channels preSend() preReceive() postSend() postReceive()
QueueChannel Yes Yes Yes Yes
PriorityChannel Yes Yes Yes Yes
RendezvousChannel Yes Yes Yes Yes
DirectChannel Yes No Yes No
ExecutorChannel Yes No Yes No
NullChannel Yes Yes Yes Yes
PublishSubscribeChannel Yes No Yes No

Once we have implemented a ChannelInterceptor, we need to add it into the channel. We can add one or more interceptors into a channel. The interceptors will be invoked in order within the list.

messageChannel.addInterceptor(channelInterceptor);

messageChannel.setInterceptors(channelInterceptorList);

MessagingTemplate

Spring Integration provides MessagingTemplate as a very easy way to integrate a messaging system into applications. It supports many common message channel operations, such as send and receive. MessagingTemplate also supports transactions by providing a PlatformTransactionManager. Let's rewrite our problem-reporting system example from earlier in the chapter using MessagingTemplate. The ProblemReporter code is modified to use the MessagingTemplate as shown in Listing 6–28. The MessagingTemplate can also build the Spring Integration Message instance.

Listing 6–28. ProblemReporter.java

package com.apress.prospringintegration.channels.messagingtemplate;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class ProblemReporter {

  private MessagingTemplate messagingTemplate;

  @Autowired
  public void setMessagingTemplate(MessagingTemplate messagingTemplate) {
    this.messagingTemplate = messagingTemplate;
  }

  public void openTicket(Ticket ticket) {
    messagingTemplate.convertAndSend(ticket);
    System.out.println("Ticket Sent - " + ticket.toString());
  }
}

The MessagingTemplate can also be used to receive messages as shown in Listing 6–29.

Listing 6–29. TicketReceiver.java

package com.apress.prospringintegration.channels.messagingtemplate;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.integration.Message;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.stereotype.Component;

@Component
public class TicketReceiver implements Runnable {

  private final static int RECEIVE_TIMEOUT = 1000;

  private MessagingTemplate messagingTemplate;

  @Autowired
  public void setMessagingTemplate(MessagingTemplate messagingTemplate) {
    this.messagingTemplate = messagingTemplate;
    this.messagingTemplate.setReceiveTimeout(RECEIVE_TIMEOUT);
  }

  public void handleTicketMessage() {
    Message<?> ticketMessage;

    while (true) {
      ticketMessage = messagingTemplate.receive();
      if (ticketMessage != null) {
        handleTicket((Ticket) ticketMessage.getPayload());
      } else {
        /* Perform Some Other Tasks Here */
      }
    }
  }

  void handleTicket(Ticket ticket) {
    System.out.println("Received ticket - " + ticket.toString());
  }

  @Override
  public void run() {
    handleTicketMessage();
  }
}

The Main class stays the same as shown in Listing 6–30.

Listing 6–30. Main.java

package com.apress.prospringintegration.channels.messagingtemplate;

import com.apress.prospringintegration.channels.core.Ticket;
import com.apress.prospringintegration.channels.core.TicketGenerator;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.List;

public class Main {

    public static void main(String[] args) {
        String contextName = "messaging-template.xml";

        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext(contextName);
        applicationContext.start();


        ProblemReporter problemReporter =
                applicationContext.getBean(ProblemReporter.class);
        TicketReceiver ticketReceiver =
                applicationContext.getBean(TicketReceiver.class);
        TicketGenerator ticketGenerator =
                applicationContext.getBean(TicketGenerator.class);


        List<Ticket> tickets = ticketGenerator.createTickets();
        for (Ticket ticket : tickets) {
            problemReporter.openTicket(ticket);
        }

        Thread consumerThread = new Thread(ticketReceiver);
        consumerThread.start();
    }
}

The Spring configuration file when using the MessageTemplate also stays the same as shown in Listing 6–31.

Listing 6–31. messaging-template.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">

  <context:component-scan
      base-package="com.apress.prospringintegration.channels.messagingtemplate"/>
  <context:component-scan
      base-package="com.apress.prospringintegration.channels.core"/>

  <int:channel id="ticketChannel"
               datatype="com.apress.prospringintegration.channels.core.Ticket">
    <int:priority-queue capacity="50"/>
  </int:channel>
</beans>

This is simialar to previous iterations of the file. It relies on the definitions established by component scannning the package and picking up the beans defined with @Component. Additionally, it relies on definitions established when the Java configuration class is scanned and bean definitions loaded:

package com.apress.prospringintegration.channels.messagingtemplate;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.core.MessagingTemplate;

@Configuration
public class MessagingTemplateConfiguration {

  @Value("#{ticketChannel}")
  private MessageChannel messageChannel;

  @Bean
  public MessagingTemplate messagingTemplate() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(this.messageChannel);
    messagingTemplate.setReceiveTimeout(1000);
    return messagingTemplate;
  }
}

Configuring a Message Channel

In this section, we'll look at how to configure the different kinds of message channels using the Spring Integration XML namespace in the Spring bean configuration file. Make sure the Spring bean configuration file includes the Spring Integration namespace, like this:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/integration  
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">
</beans>

QueueChannel

In order to create a channel instance, you can use the channel element in the Spring configuration file.

<int:channel id="queueChannel" />

The preceding example creates a QueueChannel using all default configurations. In order words, the channel is unbound on capacity. This is very dangerous, since the JVM may run out of memory. It is always a good idea to limit the number of messages in the channel.

<int:channel id="queueChannel">
    <int:queue capacity="50" />
</int:channel>

The preceding example limits the channel capacity to 50 messages. Once the channel reaches maximum capacity, all the send operations will be blocked until the channel capacity frees up.

Spring Integration also supports the data type channel design pattern by setting the datatypes element to a specific class. By default, all the Spring Integration message channels accept any type of message payload.

<int:channel id="queueChannel"
        datatype="com.apress.prospringintegration.channels.Ticket">
    <int:queue capacity="50" />
</int:channel>

PriorityChannel

Very similar to QueueChannel, PriorityChannel can also specify the queue capacity and data type. It is also a good idea to specify the queue capacity to avoid running out of memory.

<int:channel id="priorityChannel"
        datatype="com.apress.prospringintegration.channels.Ticket">
    <int:priority-queue capacity="50"/>
</int:channel>

By default, PriorityChannel uses the PRIORITY message header to determine the priority of the incoming messages.

package com.apress.prospringintegration.channels.prioritychannel;

import com.apress.prospringintegration.channels.core.Ticket;
import org.springframework.integration.Message;
import org.springframework.stereotype.Component;

import java.util.Comparator;

@SuppressWarnings("unused")
@Component
public class TicketMessagePriorityComparator
    implements Comparator<Message<Ticket>> {

  @Override
  public int compare(Message<Ticket> message1, Message<Ticket> message2) {
    Integer priority1 = message1.getPayload().getPriority().ordinal();
    Integer priority2 = message2.getPayload().getPriority().ordinal();

    priority1 = priority1 != null ? priority1 : 0;
    priority2 = priority2 != null ? priority2 : 0;

    return priority2.compareTo(priority1);
  }
}

However, we can provide a custom comparator to change the behavior of PriorityChannel:

<int:channel id="priorityChannel"
        datatype="com.apress.prospringintegration.channels.Ticket">
    <int:priority-queue capacity="50" comparator="ticketPriorityComparator" />
</int:channel>

RendezvousChannel

RendezvousChannel does not provide any additional configuration options other than setting the data type. Since it is a zero-capacity channel, the capacity cannot be changed. As a result, only one message can pass through the channel at any given time.

<int:channel id="rendezvousChannel"
        datatype="com.apress.prospringintegration.channels.Ticket">
        <rendezvous-queue/>
</int:channel>

DirectChannel

DirectChannel is the default type of Spring Integration channel. As a result, it requires nothing to specify the channel instance.

<int:channel id="directChannel"/>

Similar to the rest of the Spring Integration message channels, DirectChannel allows users to specify the payload data type. It also allows developers to enable/disable failover and specify the load-balancing strategy. The default configuration for the DirectChannel is failover enabled and round-robin load-balancing.

<int:channel id="directChannel"
        datatype="com.apress.prospringintegration.channels.Ticket">
        <int:dispatcher failover="true" load-balancer="none"/>
</int:channel>

ExecutorChannel

In order to use ExecutorChannel, we need to assign an executor to the channel dispatcher. In addition, ExecutorChannel supports the failover and load-balancing options.

<int:channel id="executorChannel"
        datatype="com.apress.prospringintegration.channels.Ticket">
        <int:dispatcher task-executor="performTicketExecutor"/>
</int:channel>

The task-executor attribute supports any Spring TaskExecutor implementation.

PublishSubscribeChannel

Similar to ExecutorChannel, the PublishSubscribeChannel supports any TaskExecutor implementation for the task-executor attribute. By default, Spring Integration will not assign a correlation ID to the outgoing messages. This may be overridden by using the apply-sequence attribute.

<int:publish-subscribe-channel id="publishSubscribeChannel"
        task-executor="performTicketExecutor" apply-sequence="true"/>

ChannelInterceptor

In order to assign interceptors to a channel, we can use the interceptor's subelement within the Spring configuration file.

<int:channel id="messageChannel">
  <int:interceptors>
    <ref bean="interceptor1"/>
    <ref bean="interceptor2"/>
    <ref bean="interceptor3"/>
  </int:interceptors>
</int:channel>

Sometimes, we may want to assign the same interceptor to all the message channels. We can use the following configuration:

<int:channel-interceptor ref="interceptor1"/>

The preceding example will assign interceptor1 to all the message channels. However, sometimes we may just want the interceptor to be assigned to certain channels.

<int:channel-interceptor ref="interceptor1" pattern="ticketChannel, emergencyChannel"/>

We can use the wildcard in the pattern attribute and also specify the position to insert the interceptor into the list.

<int:channel-interceptor ref="interceptor1" pattern="ticketChannel, emergency*,image
 lowPriority*" order="2"/>

Backing Up a Channel

By default, Spring Integration message channels store messages in memory. This is because memory is fast and easy to implement. However, the number of messages that can be stored will be bounded by the available server memory. In addition, the channel may run out of memory very easily in a high-throughput integrated system. Even worse, all the messages will be gone if the system crashes. It is very unscalable and unreliable to use only memory to back up a message channel.

A pair of inbound and outbound channel adapters (endpoints and adapters will be discussed in more depth in Chapter 9) may be used to integrate the channels into a file system or JDBC data store. However, this approach is complicated because it still involves two channel adapters even though we are only dealing with a single channel.

Here is where Spring Integration 2.0 comes to the rescue; a message channel can be backed by JMS as an alternative to memory as shown in Figure 6–19. The JMS-backed message channel decouples the message producer and consumer by making messaging asynchronously. Instead of messages being stored in memory, they are stored in the JMS messaging system which can in turn store the messages however it see fits, be it to disk, or in memory, or any other configuration supported by the JMS broker. In addition, depending on the JMS configuration, the message endpoints can now scale out and run within different processes or servers.

images

Figure 6–19. JMS-backed Message Channel

The preceding example defines a point-to-point message channel. In order to use JMS to back this channel, we will use the jms namespace and provide the JMS queue destination name. Make sure to include the Spring Integration JMS namespace. The resulting configuration is in many ways very comparable to the default, in-memory channel configuration, just from a different namespace.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/jms
    http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">
  <int-jms:channel id=”channel” queue-name=”jmsQueue” />

</beans>

By default, Spring Integration uses org.springframework.jms.support.destination.DynamicDestinationResolver to resolve the JMS queue name into the actual JMS destination. It also looks for a ConnectionFactory reference with the bean name connectionFactory. In order to make the preceding example work, we need to provide the connection factory reference.

// org.springframework.jms.connection.CachingConnectionFactory
// is a convenient way to work with JMS connection factories
@Bean
public CachingConnectionFactory connectionFactory(){
  CachingConnectionFactory ccf = new CachingConnectionFactory();
  ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
  cf.setBrokerURL("vm://localhost");
  ccf.setTargetConnectionFactory(cf);
  return ccf;
}

<int-jms:channel id="channel" queue-name="jmsQueue" />

We can provide a custom connection factory bean with a name other than connectionFactory.

@Bean
public CachingConnectionFactory jmsConnectionFactory(){
  CachingConnectionFactory ccf = new CachingConnectionFactory();
  ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
  cf.setBrokerURL("vm://localhost");
  ccf.setTargetConnectionFactory(cf);
  return ccf ;
}

<int-jms:channel id="channel" queue-name="jmsQueue" connection-factory=
"jmsConnectionFactory" />

We can provide the queue reference instead of using the JMS queue destination name.

<int-jms:channel id="channel" queue="jmsQueue" />

In order to back a publish-subscribe message channel using JMS, we will provide the JMS topic name.

<int-jms:publish-subscribe-channel id="channel" topic-name="jmsTopic"
        connection-factory="jmsConnectionFactory" />

We also can use a topic reference instead of the JMS topic destination name.

<int-jms:publish-subscribe-channel id="channel" topic="jmsTopic" />

Now we have JMS-backed message channels, which are scalable and reliable. Spring Integration 2.0 makes it very easy.

Summary

Spring Integration is inspired by Enterprise Integration Patterns.4 This chapter has specifically looked at how Spring Integration has implemented the message channel pattern. By providing various implementations of the point-to-point message channel and the publish-subscribe message channel, applications can communicate by sending messages with each other via the channels. The message operations can be shared in multiple threads or only exists in local thread.

By using the XML namespace in the Spring bean configuration file, developers can easily switch between different channel implementations. Sometimes it is necessary to invoke the messaging system within the application code. Spring Integration provides a messaging template, which supports a variety of operations for sending messages to the channels.

Since Spring Integration message channels store messages in memory, the channels are unbounded by default. To avoid running out of memory, it is always a good idea to bind the message channels by specifying the capacity. However, memory-based message channels will lose all the messages if the application crashes or the server shuts down. In order to make the message channel durable, Spring Integration 2.0 introduces JMS-backed message channels so the messages are stored within a JMS broker instead of in memory.

In the next couple chapters, we will discuss how to connect message channels with internal or external systems by using endpoints, adapters, transformers, and routers.

__________________

4 Gregor Hohpe, op. cit.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.76.175