CHAPTER 11

images

Talking to the Metal

Spring Integration supports basic computer (the metal) integration with message sources and handlers for files, socket-level communication using TCP and UDP, and input and output streaming. In addition, Spring Integration supports sending and receiving files using the FTP and SFTP protocols. Moreover, Spring Integration can talk to a database using the JDBC adapters. The channel adapters used to communicate with files, sockets, streams, file servers, and databases will be discussed.

File System Integration

Using files is one of the most basic and simple approach to enterprise integration. If the format (and structure) of the file is agreed upon by the different parties in an exchange, integration is simply a matter of producing a file and sending it. It is not quite that simple, as you still have to determine the file format and structure, the means of transferring the file, when to transfer it, and how to handle mistakes. In the end, files are an integral part of integration within and between businesses. Spring Integration provides inbound and outbound channel adapters for files.

File Adapter

The org.springframework.integration.core.MessageSource<T> implementation org.springframework.integration.file.FileReadingMessageSource is used to consume files from the file system directory. This is an inbound adapter. This message source can be used directly, but is typically employed through the Spring Integration file namespace. The first step in using the file channel adapter is adding the Maven dependencies, as shown in Listing 11–1.

Listing 11–1. Maven Dependencies for the File Channel Adapter

     <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-file</artifactId>
       <version>2.0.1.RELEASE</version>
     </dependency>

The next step is to add the file-specific namespace, as shown in Listing 11–2.

Listing 11–2. Spring Integration File Namespace

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"

       xmlns:file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file-images
2.0.xsd">
</bean>

The most basic file adapter configuration is shown in Listing 11–3. The directory of the files to be consumed is directly named, and the prevent-duplicates property is set to true so that the file will only be consumed once per session; if you kill the Spring Integration process, and files are still present in the directory that have already been delivered, they will be redelivered. No other filters have been configured, so as soon as the file appears, it will be picked up by the periodic scan regardless of whether the file has been completely written to the directory. The example code reads all files from the directory specified, pushes the file to the files channel, and writes the file to the directory.

Listing 11–3. Basic File Channel Adapter Configuration file-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd">

  <int:channel id="files"/>

  <file:inbound-channel-adapter id="filesIn" channel="files"
                                directory="file:input" prevent-duplicates="true">
    <int:poller fixed-rate="1000" max-messages-per-poll="100"/>
  </file:inbound-channel-adapter>

  <file:outbound-channel-adapter id="filesOut" channel="files" directory="file:output"/>

</beans>

A file-matching pattern can be applied as a mask—each scan will pick up only files whose name matches the pattern. You can use a standard pattern-matching filter by setting the filename-pattern property. An example of this filter is shown in Listing 11–4. Only files with onlyThisFile as the beginning part of its file name and the extension txt will be consumed.

Listing 11–4. File Channel Adapter Using the Pattern-Matching Filter file-pattern-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"

       xmlns:file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd">

  <si:channel id="files"/>

  <file:inbound-channel-adapter id="filesIn" channel="files"
                                directory="file:input" filename-pattern="onlyThisFile*.txt">
    <int:poller fixed-rate="1000"/>
  </file:inbound-channel-adapter>

  <file:outbound-channel-adapter id="filesOut" channel="files" directory="file:output"/>

</beans>

The last out-of-the-box file filter allows using a regular expression (regex) pattern to match the file name. An example of this filter is shown in Listing 11–5. The file must match the regex pattern test[0-9]+.txt to be consumed. In addition to the out-of-the-box filters, you can write a custom one by implementing the interface org.springframework.integration.file.filters.FileListFilter and setting the filter property to the custom filter class.

Listing 11–5. File Channel Adapter Using the Regex Filter file-regex-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd">

  <si:channel id="files"/>

  <file:inbound-channel-adapter id="filesIn" channel="files"
                                directory="file:input" filename-regex="test[0-9]+.txt">
    <int:poller fixed-rate="1000"/>
  </file:inbound-channel-adapter>

  <file:outbound-channel-adapter id="filesOut" channel="files" directory="file:output"/>

</beans>

All the file adapters examples above use the file outbound-channel-adapter to write the file. It is straightforward to use this channel adapter, and it will accept file, string, or byte array payload as input. The additional attribute delete-source-files will delete the input file once the file has been delivered. This property setting will only work if the inbound message has a file as the payload or if the header FileHeaders.ORIGINAL_FILE contains the source file or file path.

Finally, there is an outbound gateway file adapter, as shown in Listing 11–6. This gateway will send the file as a payload to the reply channel as soon as it is written. This gateway supports cases where further message processing is required based on the adapter successfully writing a file.

Listing 11–6. An Example of a File Outbound gatewayfile-gateway-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.file"/>

  <int:channel id="files"/>

  <file:inbound-channel-adapter id="filesIn"
                                channel="moveInput"
                                directory="file:input" prevent-duplicates="true">
    <int:poller fixed-rate="1000"/>
  </file:inbound-channel-adapter>

  <file:outbound-gateway id="mover" request-channel="moveInput"
                         reply-channel="output"
                         directory="file:output"/>

  <int:service-activator id="handler" input-channel="output" ref="replyHandler"/>

</beans>

The file namespace also includes two transformers, file-to-byte-transformer and file-to-string-transformer, which support conversion between files, strings, and byte arrays. These are the two most common transformations used when dealing with file payloads. An example of using the file byte array transformer is shown in Listing 11–7. The example reads in the files from the input directory and converts the contents into a byte array. A byte array retains all character encodings, and usually provides the first step in converting to a different format. In Listing 11–8, which demonstrates the use of the transformer, the byte array is sent to the service activator class ByteHandler, which logs the length of the byte array.

Listing 11–7. Example of Using the File-to–Byte Array transformer file-transform-byte.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.file"/>

  <int:channel id="fileOut"/>

  <file:inbound-channel-adapter id="filesIn" channel="input"
                                directory="file:input" prevent-duplicates="true">
    <int:poller fixed-rate="1000"/>
  </file:inbound-channel-adapter>

  <file:file-to-bytes-transformer id="transformer" input-channel="input"
                                  output-channel="output"/>

  <int:service-activator id="handler" input-channel="output"
                        output-channel="fileOut" ref="byteHandler"/>

  <file:outbound-channel-adapter id="filesOut" channel="fileOut" directory="file:output"/>

</beans>

Listing 11–8. Service Activator Class That Logs Byte Array Length

package com.apress.prospringintegration.file;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
public class ByteHandler {
    @ServiceActivator
    public byte[] handleBytes(byte[] input) {
        System.out.println("Copying " + input.length + " bytes");
        return input;
    }
}

The other file transformer converts the file payload into a string. This example is similar to the byte array transformer, and is shown in Listing 11–9. The files are read in from the input directory and contents converted into a string. This is useful when string output is desired, or for debugging purposes. A service activator, shown in Listing 11–10, is used to direct the string payload to the class StringHandler, in which the contents are logged.

Listing 11–9. Example of a File-to-String Transfomer file-transform-string.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.file"/>

  <si:channel id="fileOut"/>

  <file:inbound-channel-adapter id="filesIn" channel="input"
                                directory="file:input" prevent-duplicates="true">
    <int:poller fixed-rate="1000"/>
  </file:inbound-channel-adapter>

  <file:file-to-string-transformer id="transformer" input-channel="input"
                                   output-channel="output" charset="UTF-8"/>

  <int:service-activator id="handler" input-channel="output"
                                      output-channel="fileOut"
                                      ref="stringHandler"/>

  <file:outbound-channel-adapter id="filesOut" channel="fileOut" directory="file:output"/>

</beans>

Listing 11–10. Service Activator Class That Logs File Content

package com.apress.prospringintegration.file;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
public class StringHandler {
    @ServiceActivator
    public String handleString(String input) {

        System.out.println("*** Copying text: " + input);
        return input;
    }
}

Native Event File Adapter

The default Spring Integration inbound file adapter polls the input directory for any new file. Depending on the polling rate, there can be a lag between when a new file is written and when the inbound channel adapter consumes the file. There is currently development on a native event file adapter that uses the file system kernel APIs on Linux, Mac, and Windows operating systems for notifications of new files, instead of adapter constantly polling for a new file. This will eliminate the lag time between when a file is written to the source directory and subsequently picked up by the normal file adapter's polling scans. As soon as the operating system or kernel sees the file, it publishes the notification to the channel and delivers it to consumers in Spring Integration. This code is in the Spring Integration sandbox at the time of this writing. To see how a version of the Linux native adapter might be implemented, consult Chapter 15. Ideally, the final version will be available with the Mac and Windows version in the 2.1 Spring Integration release.

TCP and UDP Integration

Spring Integration supports both UDP- and TCP-based communication. Both one-way and two-way communication are supported, using the channel adapters and message gateways, respectively. The maven dependencies for the UDP and TCP/IP adapters are shown in Listing 11–11.

Listing 11–11. Maven Dependencies for the UDP and TCP Adapters

     <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-ip</artifactId>
       <version>2.0.1.RELEASE</version>
     </dependency>

Spring Integration supports both sending and receiving a datagram packet to a single destination and to a multicast address. The message handler class for sending a datagram packet is org.springframework.integration.ip.udp.UnicastSendingMessageHandler, and the message adapter class for receiving the datagram packet is org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter. To simplify configuring these components, Spring Integration provides an XML ip namespace, as shown in Listing 11–12.

Listing 11–12. Internet Protocol (IP) Namespace

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:ip="http://www.springframework.org/schema/integration/ip"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context

    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/ip
    http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd">
</beans>

A basic example of using UDP with Spring Integration is shown in Listing 11–13. The udp-outbound-channel-adapter sends a UDP datagram packet out on port 1234 based on the message payload sent to the channel sendUdp. The datagram is received by the udp-inbound-channel-adapter and sent to the class UdpListener using the service activator configuration. The service activator logs the datagram message, as shown in Listing 11–14.

Listing 11–13. Basic Spring Integration UDP Configuration udp-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:ip="http://www.springframework.org/schema/integration/ip"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/ip
    http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.ip"/>

  <int:channel id="sendUdp"/>

  <ip:udp-outbound-channel-adapter id="udpOut" host="localhost" port="12345"
                                   multicast="false" check-length="true"
                                   channel="sendUdp"/>

  <ip:udp-inbound-channel-adapter id="udpIn" port="12345" receive-buffer-size="500"
                                  multicast="false" check-length="true"
                                  channel="receiveUdp" />

  <int:service-activator id="updHandler" input-channel="receiveUdp" ref="udpListener"/>

</beans>

Listing 11–14. UdpListener Service Activator Class

package com.apress.prospringintegration.ip;

import org.springframework.integration.Message;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;


@Component
public class UdpListener {
    @ServiceActivator
    public void handleUdp(Message<?> message) {
        System.out.println("*** UDP Message: " + message);
        System.out.println("*** UDP Message Payload: "
                + new String((byte[])message.getPayload()));
    }
}

UDP is an unreliable protocol, and thus Spring Integration provides two options to improve its reliability. One option used in the example in Listing 11–13 is setting the property check-length to true. This causes the adapter to prepend the length of the message to the message data. This allows the message to be checked for completeness. However, this requires that the receiving endpoint know about this check.

Another option is the acknowledge property. When set to true, this property causes the sending to wait for an acknowledgement from the receiving destination. Again, the destination endpoint must be set up to send an acknowledgement to the sender for this check to work. The configuration for this option is shown in Listing 11–15.

Listing 11–15. Using UDP Application-Level Acknowledgement

  <ip:udp-outbound-channel-adapter id="udpOut" host="localhost" port="12345"
                                   multicast="false" check-length="true"
                                   channel="sendUdp" acknowledge="true"
                                   ack-host="localhost" ack-port="12312"
                                   ack-timeout="5000"/>

A UDP multicast example is shown in Listing 11–16. The adapter configuration is similar to the single-destination configuration, except that the multicast property is set to true.

Listing 11–16. UDP Multicast udp-multicast.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:ip="http://www.springframework.org/schema/integration/ip"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/ip
    http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.ip"/>

  <int:channel id="sendUdp"/>

  <ip:udp-outbound-channel-adapter id="udpOut" host="localhost" port="12345"
                                   multicast="true" check-length="true"
                                   channel="sendUdp"/>


  <ip:udp-inbound-channel-adapter id="udpIn" port="12345" receive-buffer-size="500"
                                  multicast="true" check-length="true"
                                  channel="receiveUdp" multicast-address="225.6.7.8"/>

  <int:service-activator id="updHandler" input-channel="receiveUdp" ref="udpListener"/>

</beans>

The basis for the underlying connection used by the TCP adapters can be configured using a connection factory. Spring Integration provides two TCP connections factories, one for the client and one for the server. A client factory is used for creating an outgoing connection, whereas a server factory is used to listen for an incoming connection. The TCP connection factories support both a java.net.Socket and java.nio.channel.SocketChannel connection set via the property using-nio, which when set to true uses the NIO (native input/output) connection introduced in Java 5. The TCP example shown in Listing 11–17 uses the Socket connection.

Listing 11–17. TCP Channel Adapter tcp-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:ip="http://www.springframework.org/schema/integration/ip"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/ip
    http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.ip"/>

  <int:channel id="tcpSend"/>

  <ip:tcp-connection-factory id="client" type="client" host="localhost"
                             port="1234" single-use="true" so-timeout="10000"/>

  <ip:tcp-connection-factory id="server" type="server" host="localhost" port="1234"/>

  <ip:tcp-outbound-channel-adapter id="tcpOutbound" channel="tcpSend"
                                   connection-factory="client"/>

  <ip:tcp-inbound-channel-adapter id="tcpInbound" channel="tcpReceive"
                                  connection-factory="server"/>

  <int:service-activator id="tcpHandler" input-channel="tcpReceive"
                        ref="tcpListener"/>

</beans>

This basic example takes messages sent to the message channel tcpSend and sends it out using the tcp-outbound-channel-adapter. The channel adapter configuration is quite simple, only requiring a message channel and connection factory. Note that the inbound and outbound channel adapter can share the same connection factory; however, the inbound adapter owns the server connection factory, and the outbound adapter owns the client adapter. Only one adapter of each type can reference a single connection factory.

There are numerous properties for the connection factories, and only the pertinent ones will be mentioned here. The client or server is set through the type property. The hostname is set using the host property, and the port is set using the port property. If the single-use property is set to true, a new connection will be created each time a message is sent. The so-timeout property determines the socket timeout in milliseconds.

Following the previous example, a new TCP connection is made to localhost port 1234 of the tcp-inbound-channel-adapter. The message payload is sent to the tcpReceive channel and handled by the TcpListener service activator class shown in Listing 11–18. The service activator class logs the message payload.

Listing 11–18. TcpListener Service Activator Class

package com.apress.prospringintegration.ip;

import org.springframework.integration.Message;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
public class TcpListener {
    @ServiceActivator
    public void handleTcp(Message<?> message) {
        System.out.println("*** TCP Message: " + message);
        System.out.println("*** TCP Message Payload: "
                + new String((byte[]) message.getPayload()));
    }
}

TCP is a streaming protocol, and the message payload must be serialized and broken into discrete messages. At the protocol level, this is called a frame. Spring Integration provides several (de)serializers for this purpose. All serializers accept either a byte array or a string as input. All deserializers produce a byte array.

  • The default (de)serializer org.springframework.integration.ip.tcp.serializer.ByteArrayCrlfSerializer converts the byte array into a stream of bytes followed by a carriage return and line feed. This serializer is used when none is explicitly specified, and will support a telnet client.
  • The second (de)serializer is org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer, which converts the byte array into a stream of bytes preceded by a STX (Start of TeXt) and followed by an ETX (End of TeXt).
  • The third (de)serializeris org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer, which converts the byte array into a stream of bytes preceded by a 4-byte binary length.

There are also inbound and outbound TCP gateways that use the same TCP connection factories described previously. They allow for request/reply scenarios using a TCP connection. An example of using the TCP gateways is shown in Listing 11–19.

Listing 11–19. TCP Gateway tcp-gateway.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:ip="http://www.springframework.org/schema/integration/ip"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/ip
    http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.ip"/>

  <int:gateway id="tcpGateway" default-request-channel="outboundRequest"
              service-interface="com.apress.prospringintegration.ip.TcpGateway"/>

  <ip:tcp-connection-factory id="client" type="client" host="localhost"
                             port="1234" single-use="true"
                             so-timeout="10000"/>

  <ip:tcp-connection-factory id="server" type="server" host="localhost" port="1234"/>

  <ip:tcp-inbound-gateway id="inGateway" request-channel="inboundRequest"
                          connection-factory="server"/>

  <ip:tcp-outbound-gateway id="outGateway" request-channel="outboundRequest"
                           reply-channel="outboundReply"
                           connection-factory="client"/>

  <int:transformer id="bytes2String" input-channel="outboundReply"
                  expression="new String(payload)"/>

  <si:service-activator id="tcpHandler" input-channel="inboundRequest" ref="tcpEcho"/>

</beans>

A gateway is used as the entry point for this example, based on the interface TcpGateway, as shown in Listing 11–20. A string message is sent with a string response. This gateway forwards the string payload to the tcp-outbound-gateway through the outboundRequest message channel. The outbound gateway using the client connection factory connects to localhost port 1234 of the tcp-inbound-gateway. The inbound gateway forwards the message payload to the TcpEcho service activator via the inboundRequest message channel. The TcpEcho class shown in Listing 11–21 simply echoes the payload prepended with Reply:. Finally, the reply message is passed back through the outboundReply message channel, using a transformer to convert the byte array used by the TCP protocol to a string, and then sent back to the initial gateway.

Listing 11–20. Gateway Interface TcpGateway

package com.apress.prospringintegration.ip;

public interface TcpGateway {
    public String sendTcp(String message);
}

Listing 11–21. TcpEcho Service Activator

package com.apress.prospringintegration.ip;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
public class TcpEcho {
    @ServiceActivator
    public String echo(byte[] bytes) {
        return "reply: " + new String(bytes);
    }
}

Stream Processing

Spring Integration provides two adapters for reading from streams: org.springframework.integration.stream.ByteStreamReadingMessageSource for a byte stream and org.springframework.integration.stream.CharacterStreamReadingMessageSource for a character stream. The byte stream version requires an InputStream and the character version requires a Reader; both are specified through the constructor. The byte stream version has the optional property bytesPerMessage, which specifies how many bytes it will attempt to read in for each message; the default is 1024. The required Maven dependencies are shown in Listing 11–22 and sample configurations are shown in Listing 11–23.

Listing 11–22. Maven Dependencies for the Stream Adapters

     <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-stream</artifactId>
       <version>2.0.1.RELEASE</version>
     </dependency>

Listing 11–23. Reading from a Stream

<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
  <constructor-arg ref="inputStream"/>
  <property name="bytesPerMessage" value="4096"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
  <constructor-arg ref="reader"/>
</bean>

Two adapters are provided for writing to a stream, org.springframework.integration.stream.ByteStreamWritingMessageHandler for writing to a byte stream and org.springframework.integration.stream.CharacterStreamWritingMessageHandler for writing to a character stream. Similar to the stream readers, the byte stream version requires an OutputStream and the character version requires a Writer, again specified through the constructor. Both adapters have an optional second constructor argument for the buffer size. Sample configurations are shown in Listing 11–24.

Listing 11–24. Writing to a Stream in a Java Configuration

@Bean
public org.springframework.integration.stream.ByteStreamWritingMessageHandlerimages
 byteStreamWritingMessageHandler () {
    java.io.OutputStream os =  ... ; // accquire this as appropriate to your application
    return new ByteStreamWritingMessageHandler(os, 1024) ;
}

@Bean
public CharacterStreamWritingMessageHandler characterStreamWritingMessageHandler () {
    java.io.Writer w = ... ; // accquire this as appropriate to your application
    return new CharacterStreamWritingMessageHandler(w) ;
}

Stdin and Stdout

Spring Integration currently only provides namespace support for stdin and stdout streaming. The stdin adapter reads from the system's STDIN input stream (typically, this describes the content available when somebody writes to a command line or types something; in Java, this can be accessed from the System.in object). The stdout adapter writes to the system's STDOUT output stream (typically, this describes the stream that controls what shows up on the system's command line; in Java this can be accessed from the System.out object). A simple example that echoes any character sent to the consoleis shown in Listing 11–25.

Listing 11–25. Simple Stdin and Stdout Namespace Configuration input-stream.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:stream="http://www.springframework.org/schema/integration/stream"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/stream
    http://www.springframework.org/schema/integration/stream/spring-integration-streamimages
-2.0.xsd">

  <si:poller default="true" fixed-rate="50"/>


  <int:channel id="input"/>

  <stream:stdin-channel-adapter id="stdin" channel="input"/>

  <stream:stdout-channel-adapter id="stdout" channel="input" append-newline="true"/>

</beans>

Following a Log File

To demonstrate the power of Spring Integration's stream adapters, an example of following a log file is shown in Listing 11–26. This example allows monitoring a log file by creating an input stream, which is read into a message channel. An input stream of a logging file is passed to the ByteStreamReadingMessageSource adapter. An inbound channel adapter is configured to reference the input stream adapter. The input stream is polled and pushed to the message channel input. The service activator class LogHandler, shown in Listing 11–27, takes the incoming message payload and prints it to the console. This is a powerful technique that allows Spring Integration full access to an application log file.

Listing 11–26. Spring Configuration for Following a Log File stream-log.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:si="http://www.springframework.org/schema/integration"
       xmlns:stream="http://www.springframework.org/schema/integration/stream"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
                http://www.springframework.org/schema/integration
                http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
                http://www.springframework.org/schema/integration/stream
                http://www.springframework.org/schema/integration/stream/spring-integrationimages
-stream-2.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.stream"/>

  <bean id="loggingSource"
        class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
    <constructor-arg ref="inputStream"/>
  </bean>

  <bean id="inputStream" class="java.io.FileInputStream">
    <constructor-arg type="java.lang.String" value="example.log"/>
  </bean>

  <si:inbound-channel-adapter id="inboundChannel" channel="input" ref="loggingSource">
    <si:poller fixed-rate="1000" max-messages-per-poll="100"/>
  </si:inbound-channel-adapter>

  <si:service-activator id="logProcess" ref="logHandler" input-channel="input"/>
</beans>

Listing 11–27. Log Handler Service Activator

package com.apress.prospringintegration.stream;

import org.springframework.integration.Message;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
public class LogHandler {
    @ServiceActivator
    public void handleLog(Message<?> message) {
        System.out.println("Log Payload: "
                + new String((byte[])message.getPayload()));
    }
}

FTP/FTPS and SFTP

Spring Integration supports file transfer operations using File Transfer Protocol (FTP), FTP Secure (FTPS), and Secure File Transfer Protocol (SFTP). Spring Integration allows sending and receiving files to and from a server using all of these protocols.

FTP

Spring Integration supports sending and receiving files over FTP by providing inbound and outbound channel adapters. As usual, the Spring Integration FTP adapter requires the additional Maven dependency:

     <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-ftp</artifactId>
       <version>2.0.1.RELEASE</version>
     </dependency>

In addition, you can provide namespace support for these adapters by adding the following header to your Spring configuration file:

<beans xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
    http://www.springframework.org/schema/integration/ftp/spring-integration-ftp-2.0.xsd">

The first step in using the FTP adapters is creating an FTP session factory based on the class org.springframework.integration.ftp.session.DefaultFtpSessionFactory. The session factory is easily created using a Java configuration class, as shown in Listing 11–28.

Listing 11–28. Java Configuration for Creating an FTP Session

package com.apress.prospringintegration.ftp;

import org.apache.commons.net.ftp.FTPClient;
import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;

@Configuration
public class FtpConfiguration {

    @Value("${host}")
    private String host;

    @Value("${username}")
    private String username;

    @Value("${password}")
    private String password;

    @Bean
    public DefaultFtpSessionFactory ftpClientFactory() {
        DefaultFtpSessionFactory ftpSessionFactory =
                new DefaultFtpSessionFactory();
        ftpSessionFactory.setHost(host);
        ftpSessionFactory.setUsername(username);
        ftpSessionFactory.setPassword(password);
        ftpSessionFactory.setClientMode(
                FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        return ftpSessionFactory;
    }
}

The required parameters are the host, username, and password. These parameters are set via a properties files shown in Listing 11–29.

Listing 11–29. FTP Configuration Properties File ftp.properties

host=localhost
username=[username]
password=[password]

Note that you can support FTPS simply by configuring a different FTP session factory. FTPS is like FTP, except it uses SSL to communicate data privately. To use it, you need to have the key for the remote host in your Java key store. It is up to the implementer to install the FTP server's certificate. There are many ways to do this, but they are all painful. Instead, you might have an easier time using a key management tool, such as Portecle, from SourceForge (see http://sourceforge.net/projects/portecle/files/portecle/1.7/portecle-1.7.zip/download). Portecleis an opensource Java UI for keymanagement, and is very simple. Run java -jar portecle.jar, and when the UI shows up, go to Tools images Import Trusted Certificate. If your certificate is a .cert file, then choose the .cert file or choose the appropriate format. Then your FTPS-based communication (including the Spring Integration adapter) should work.

Substitute the configuration in Listing 11–30 for the DefaultFtpSessionFactory bean configured in 11–28. The rest of the examples can use either FTP session factory implementation—they do not care whether you are using FTP or FTPS.

Listing 11–30. FTPS Java Configuration

package com.apress.prospringintegration.ftp;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;

@Configuration
public class FtpConfiguration {

    @Value("${host}")
    private String host;

    @Value("${username}")
    private String username;

    @Value("${password}")
    private String password;

    @Bean
    public DefaultSftpSessionFactory ftpClientFactory() {
        DefaultSftpSessionFactory ftpSessionFactory =
                new DefaultSftpSessionFactory();
        ftpSessionFactory.setHost(host);
        ftpSessionFactory.setUser(username);
        ftpSessionFactory.setPrivateKeyPassphrase(password);
        return ftpSessionFactory;
    }
}
FTP Inbound Channel Adapter

The FTP inbound channel adapter is a listener adapter that connects to an FTP server based on the session factory, and checks for new files created, at which point it initiates a file transfer. The adapter then publishes a message containing a File instance of the file just transferred. An example of the Spring configuration file using the inbound FTP adapter is shown in Listing 11–31.

Listing 11–31. Spring Configuration for the Inbound FTP Adapter ftp-inbound-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/context

    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration/ftp
    http://www.springframework.org/schema/integration/ftp/spring-integration-ftp-2.0.xsd">

  <context:property-placeholder location="/spring/ftp/ftp.properties"/>
  <context:component-scan base-package="com.apress.prospringintegration.ftp"/>

  <int-ftp:inbound-channel-adapter id="ftpInbound"
                                   channel="ftpChannel"
                                   session-factory="ftpClientFactory"
                                   filename-regex=".*.txt$"
                                   auto-create-local-directory="true"
                                   delete-remote-files="false"
                                   remote-directory="."
                                   local-directory="file:output">
    <int:poller fixed-rate="1000"/>
  </int-ftp:inbound-channel-adapter>

  <int:channel id="ftpChannel">
    <int:queue/>
  </int:channel>

</beans>

The component-scan element is used to support the session factory Java configuration, and the property-placeholder element is used to read the properties file. The FTP inbound channel adapter is a polling consumer requiring a poller to initiate the FTP connection and check for any new files that may have been created. Once a file has been transferred, the adapter will publish a message with a java.io.File based on the newly transferred file as its payload, and send the message to the message channel ftpChannel. This adapter allows file filtering based on the file name, pattern, or regex, similar to the file adapter discussed previously in this chapter. For this example, a regex expression specified through the filename-regex attribute is used to pick up any file with the extension txt.

Additional attributes include delete-remote-files, which when set to true will delete the file at the remote server after the file has been transferred; remote-directory, which allows selecting the remote directory from which to transfer the file; and local-directory, which allows specifying the directory to transfer the file to. A simple main class to run this example is shown in Listing 11–32. This main class creates the Spring context and listens for an incoming message when a file is transferred.

Listing 11–32. FTP Inbound Channel Adapter Example main Class

package com.apress.prospringintegration.ftp;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.core.PollableChannel;

public class FtpInbound {

    public static void main(String[] args) {
        ApplicationContext context =
                new ClassPathXmlApplicationContext("/spring/ftp/ftp-inbound-context.xml");
        PollableChannel ftpChannel = context.getBean("ftpChannel", PollableChannel.class);


        Message<?> message =  ftpChannel.receive();
        System.out.println("message: " + message);
    }
}
FTP Outbound Channel Adapter

The FTP outbound channel adapter supports connecting to a remote FTP server and transferring a file based on the incoming payload. The adapter supports the following payloads:

  • java.io.File: The actual file object
  • byte[]: A byte array that represents the file context
  • java.lang.String: Text that represents the file context

This adapter uses the same FTP session factory as the inbound adapter discussed previously. An example Spring configuration file for a FTP outbound channel adapter is shown in Listing 11–33.

Listing 11–33. Spring Configuration for the FTP Outbound Channel Adapter ftp-outbound-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration/ftp
    http://www.springframework.org/schema/integration/ftp/spring-integration-ftp-2.0.xsd">

  <context:property-placeholder location="/spring/ftp/ftp.properties"/>
  <context:component-scan base-package="com.apress.prospringintegration.ftp"/>

  <int:channel id="ftpChannel"/>

  <int-ftp:outbound-channel-adapter id="ftpOutbound"
                                    channel="ftpChannel"
                                    remote-directory="."
                                    session-factory="ftpClientFactory"/>

</beans>

The FTP outbound channel adapter will take an incoming message on the channel ftpOutbound, initiate a connection to the FTP server based on the session-factory attribute, and transfer the file based on the message payload. The file will be transferred to the directory specified by the remote-directory attribute. An example main class that uses the FTP outbound channel adapter is shown in Listing 11–34.

Listing 11–34. FTP Outbound Channel Adapter Example main Class

package com.apress.prospringintegration.ftp;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;

import java.io.File;

public class FtpOutbound {

    public static void main(String[] args) {
        ApplicationContext context =
                new ClassPathXmlApplicationContext("/spring/ftp/ftp-outbound-context.xml");
        MessageChannel ftpChannel = context.getBean("ftpChannel", MessageChannel.class);
        File file = new File("readme.txt");
        Message  <file> message = MessageBuilder.withPayload(file).build();
        ftpChannel.send(message);
    }
}

This example class creates the Spring context and sends a message to the ftpChannel message channel. The payload is the File instance representing the file readme.txt. This will cause the FTP adapter to connect to the remote FTP server and transfer the readme.txt file.

SFTP

Spring Integration also provides support for SFTP when file transfer is required to be over a secure stream. SFTP is a file system–like view over an SSH connection, and most of the time having SSH access to a server is enough to be able to use SFTP. The SFTP protocol requires a secure channel like SSH, as well visibility to the client's identity throughout the SFTP session. Similar to the FTP adapters, Spring Integration supports sending and receiving files over SFTP by providing inbound and outbound channel adapters. The following Maven dependency is required for the SFTP adapter:

     <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-sftp</artifactId>
       <version>2.0.1.RELEASE</version>
     </dependency>

You can provide namespace support for these adapters by adding the following header to your Spring configuration file:

<beans xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp
    http://www.springframework.org/schema/integration/sftp/spring-integration-sftp-images
2.0.xsd">

Like the FTP adapters, the first step in using the SFTP adapters is to create an SFTP session factory based on the class org.springframework.integration.sftp.session.DefaultSftpSessionFactory. The session factory is again created using a Java configuration class, as shown in Listing 11–35.

Listing 11–35. Java Configuration for Creating the SFTP Session Factory

package com.apress.prospringintegration.ftp;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;

@Configuration
public class SftpConfiguration {

    @Value("${host}")
    private String host;

    @Value("${username}")
    private String username;

    @Value("${password}")
    private String password;

    @Bean
    public DefaultSftpSessionFactory sftpSessionFactory() {
        DefaultSftpSessionFactory sessionFactory = new DefaultSftpSessionFactory();
        sessionFactory.setHost(host);
        sessionFactory.setUser(username);
        sessionFactory.setPassword(password);

        return sessionFactory;
    }
}

This session factory also supports using SSH keys (and keys that themselves are locked with a password), which can be set through the DefaultSftpSessionFactory session factory instance. The essential host, username, and password parameters used in this example are set through the same properties file as the FTP adapter.

SFTP Inbound Channel Adapter

The SFTP inbound channel adapter is a listener adapter that connects to an SFTP server based on the session factory, and checks for new files created, at which point it initiates a file transfer. The adapter then publishes a message containing a file instance of the file just transferred. The Spring configuration file for an example using the inbound SFTP adapter is shown in Listing 11–36.

Listing 11–36. Spring Configuration for Inbound SFTP Adapter sftp-inbound-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
       xmlns:context="http://www.springframework.org/schema/context"

       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration/sftp
    http://www.springframework.org/schema/integration/sftp/spring-integration-sftp-2.0.xsd">

  <context:property-placeholder location="/spring/ftp/ftp.properties"/>
  <context:component-scan base-package="com.apress.prospringintegration.ftp"/>

  <int-sftp:inbound-channel-adapter id="ftpInbound"
                                   channel="ftpChannel"
                                   session-factory="sftpSessionFactory"
                                   filename-regex=".*.txt$"
                                   auto-create-local-directory="true"
                                   delete-remote-files="false"
                                   remote-directory="."
                                   local-directory="file:output">
    <int:poller fixed-rate="1000"/>
  </int-sftp:inbound-channel-adapter>

  <int:channel id="ftpChannel">
    <int:queue/>
  </int:channel>

</beans>

Again, the context component-scan element is used to scan the configured package and find the session factory Java configuration. The property-placeholder element is used to read the properties file, and resolve and replace placeholder expressions in the configuration. The SFTP inbound channel adapter is also a polling consumer requiring a poller to initiate the SFTP connection and check for any new files that may have been created. Once the file has been transferred, the adapter will publish a message with a java.io.File based on the newly transferred file as its payload and send the message to the message channel ftpChannel. This adapter allows file filtering based on the file name, pattern, or regex, similar to the file adapter discussed previously in this chapter. For this example, a regex expression specified through the filename-regex attribute is used to pick up any file with the extension txt.

Additional attributes include delete-remote-files, which when set to true will delete the file at the remote server after the file has been transferred; remote-directory, which allows selecting the remote directory from which to transfer the file; and local-directory, which allows specifying the directory to transfer the file to. A simple main class to run this example is shown in Listing 11–37. This main class creates the Spring context and listens for an incoming message when a file is transferred.

Listing 11–37. SFTP Inbound Channel Adapter Example main Class

package com.apress.prospringintegration.ftp;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.core.PollableChannel;


public class SftpInbound {
    public static void main(String[] args) {
        ApplicationContext context =
                new ClassPathXmlApplicationContext("/spring/ftp/sftp-inbound-context.xml");
        PollableChannel ftpChannel = context.getBean("ftpChannel", PollableChannel.class);

        Message<?> message = ftpChannel.receive();
        System.out.println("message: " + message);
    }
}
SFTP Outbound Channel Adapter

The SFTP outbound channel adapter supports connecting to a remote SFTP server and transferring a file based on the incoming payload. Similar to the FTP adapter, the SFTP adapter supports the following payloads:

  • java.io.File: The actual file object
  • byte[]: A byte array that represents the file context
  • java.lang.String: Text that represents the file context

This adapter uses the same SFTP session factory as the inbound adapter discussed previously. An example Spring configuration file for an SFTP outbound channel adapter is shown in Listing 11–38.

Listing 11–38. Spring Configuration for the SFTP Outbound Channel Adapter sftp-outbound-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration/sftp
    http://www.springframework.org/schema/integration/sftp/spring-integration-sftp-2.0.xsd">

  <context:property-placeholder location="/spring/ftp/ftp.properties"/>
  <context:component-scan base-package="com.apress.prospringintegration.ftp"/>

  <int:channel id="ftpChannel"/>

  <int-sftp:outbound-channel-adapter id="ftpOutbound"
                                    channel="ftpChannel"
                                    remote-directory="."
                                    session-factory="sftpSessionFactory"/>

</beans>

The SFTP outbound channel adapter will take an incoming message on the ftpOutbound channel, initiate a connection to the SFTP server based on the session-factory attribute, and transfer the file based on the message payload. The file will be transferred to the directory specified by the remote-directory attribute. An example main class that uses the SFTP outbound channel adapter is shown in Listing 11–39.

Listing 11–39. FTP Outbound Channel Adapter Example main Class

package com.apress.prospringintegration.ftp;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;

import java.io.File;

public class SftpOutbound {

    public static void main(String[] args) {
        ApplicationContext context =
                new ClassPathXmlApplicationContext("/spring/ftp/sftp-outbound-context.xml");
        MessageChannel ftpChannel = context.getBean("ftpChannel", MessageChannel.class);
        File file = new File("readme.txt");
        Message  <file> message = MessageBuilder.withPayload(file).build();
        ftpChannel.send(message);
    }
}

This example class creates the Spring context and sends a message to the ftpChannel message channel. The payload is the java.io.File instance representing the file readme.txt. This will cause the SFTP adapter to connect to the remote SFTP server and transfer the readme.txt file.

Spring Integration's Remote File System Abstractions

The examples introduced thus far supporting inbound and outbound File, FTP/FTPS, and SFTP adapters are similar, and indeed, the only thing that needed to change besides the XML namespace used was the SessionFactory implementation. This is intentional—all the adapters share a number of common abstractions. The SessionFactory instances are in reality all implementations of the org.springframework.integration.file.remote.session.SessionFactory interface, which is responsible for producing a Session instance specific to each of the adapters. The Session interface is a generic view on top of the file system APIs that the adapters work with. The interface is reproduced here:

package org.springframework.integration.file.remote.session;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public interface Session {

    boolean remove(String path) throws IOException;


    <F> F[] list(String path) throws IOException;

    void read(String source, OutputStream outputStream) throws IOException;

    void write(InputStream inputStream, String destination) throws IOException;

    void rename(String pathFrom, String pathTo) throws IOException;

    void close();

    boolean isOpen();

}

The Session (which surfaces an adapter-specific view of the file system) and the type of file being transmitted are the only things that really change in the various file transfer adapters. Each adapter uses an underlying abstraction that handles common operations such as listing files, getting file names and byte sizes, and moving files differently. So, the Session extracts this common functionality into a hierarchy that can be reused. It follows then that the adapters themselves can all share the same base code, as they all support writing to a remote file system and receiving remote files into a local directory. The inbound adapters descend from org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource<F>. The AbstractInboundFileSynchronizingMessageSource<F> in turn delegates to implementations of org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer<F> to do the file system–specific work. These are strategy interfaces—if you want to write an inbound adapter that behaves in exactly the same way as these adapters, you need only implement the relatively trivial abstract methods on these classes. The inbound adapters support ways to poll the remote file system, download the new files to a local directory, and then deliver those new files as Spring Integration messages with a payload of type java.io.File. All the adapters support the option to delete the received file from the remote file system once it has been delivered. All the remote file system inbound adapters also support specifying a remote directory path to monitor and automatically create the directories that are used on startup. It is important to note that the remote file system adapters have two important components: the file system–specific synchronizer, which polls the remote file system for new files and downloads them, and the org.springframework.integration.file.FileReadingMessageSource (this is the class behind the inbound file adapter discussed previously), which actually delivers the java.io.File objects from the local download folder.

This design has several redeeming qualities. First, the adapters synchronize files to a local directory and then deliver those files to the consumers. If the Spring Integration process is killed during delivery, but there are still files in the local download directory, then these files will be still be delivered correctly when the adapter is restarted, because it is the local directory that is in fact the source of the delivered messages, not the remote file system. Additionally, consumers of these files can transform their contents and perform operations on them without fear of network I/O costs when using the normal java.io.File references.

The remote file system outbound adapters also share a common hierarchy of collaborating objects. In practice, all remote file system adapters are instances of org.springframework.integration.file.remote.handler.FileTransferringMessageHandler, and thus share common operations and features. All adapters support the configuration of an output character set, the local temporary directory to use for files being transferred, and a org.springframework.integration.file.FileNameGenerator implementation to customize how remotely written file names are generated from a Spring Integration message.

Spring Integration's File System Abstractions

There are abstractions common to both the remote file system adapters and the local file system adapter. Common to all the inbound adapters is support for limiting which files are detected during scans of the directory. There are three common but mutually exclusive attributes available in all the adapters' namespace-based configurations: filename-pattern, filename-regex, and filter. The first two attributes, filename-pattern and filename-regex, are convenience attributes. The filename-pattern attribute lets the user specify an Ant-style path or a matching expression to filter which files to use. The filename-regex attributes instead takes a regular expression, which, while more powerful, can also be a bit trickier to use. Both of these attributes ultimately result in the configuration of an implementation of org.springframework.integration.file.filters.FileListFilter. This interface is reproduced here:

package org.springframework.integration.file.filters;

import java.util.List;

public interface FileListFilter<F> {

    List<F> filterFiles(F[] files);
}

Most of the implementations of this interface are algorithmic, and do not depend on any specific file system APIs to do their jobs. Where required, there are file system–specific subclasses. One file system–specific feature commonly required is the derivation of a file's file name: the APIs are different for java.io.File, org.apache.commons.net.ftp.FTPFile (used in the FTP and FTPS adapters), and com.jcraft.jsch.LsEntry (used in SFTP adapters).

When you configure the filename-pattern attribute, an instance of a subclass of AbstractSimplePatternFileListFilter<F> (which in turn is an implementation of org.springframework.integration.file.filters.FileListFilter<F>) is configured as a filter on the adapter. When you configure a filename-regex attribute, an instance of a subclass of AbstractRegexPatternFileListFilter<F> (which in turn is an implementation of FileListFilter<F>) is configured as a filter on the adapter. If you require neither a regular expression–based nor an Ant-style expression-based filter, then you can configure the filter attribute. The filter attribute takes a reference to an instance of FileListFilter<F>. There are numerous implementations provided out of the box, and of course you are free to provide your own implementation. To compose multiple filters, use the CompositeFileListFilter.

All the outbound adapters support messages with payloads of type byte[], java.io.File, and java.lang.String, and can correctly write them to a target directory (local or remote).

JDBC

The JDBC adapters provide a good solution to a very common requirement in integration. Most enterprise applications require some sort of interaction with a database. The JDBC channel adapters support sending and receiving messages via database queries. To support the JDBC example, the embedded database HyperSQL (http://hsqldb.org) will be used, since Spring JDBC supports it by default. The Spring jdbc namespace initializes and creates a simple table using HyperSQL. The Spring configuration file is shown in Listing 11–40.

Listing 11–40. Spring Configuration That Initializies the HyperSQL Database

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xsi:schemaLocation="http://www.springframework.org/schema/jdbc
    http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd">

  <jdbc:embedded-database id="dataSource">
    <jdbc:script location="/spring/jdbc/hsqldb.sql"/>
  </jdbc:embedded-database>

</beans>

The jdbc:script element used to set the SQL script hsqldb.sql shown in Listing 11–41 which will be run after the database has initialized. This script creates a sample table used in all the JDBC adapter examples.

Listing 11–41. SQL Script hsqldb.sql

CREATE TABLE t (id INTEGER PRIMARY KEY,
   firstname VARCHAR(20),
   lastname VARCHAR(20),
   status INTEGER);
INSERT INTO t (id, firstname, lastname, status) VALUES (1, 'Felix', 'the Cat', 0);
INSERT INTO t (id, firstname, lastname, status) VALUES (2, 'Pink', 'Panther', 0);

JDBC Inbound Channel Adapter

The JDBC inbound channel adapter's basic function is to execute a SQL query, and then return the result set as the message payload. The message payload will contain the entire result set as a List. Using the default row-mapping strategy, the column values will be returned as a Map with the column name being the key values. You can customize the mapping strategy by implementing the org.springframework.jdbc.core.RowMapper<T> interface and referencing this class through the row-mapper attribute. A downstream splitter component can be used if individual messages per row are desired. The JDBC inbound channel adapter requires a reference either to a JdbcTemplate or DataSource instance.

An update statement can be added to the JDBC inbound channel adapter to mark rows as read to prevent the rows from showing up in the next poll. The update statement has access to a parameterized list of values from the original select statement. The list of values follow a default naming convention where a column in the input result set is translated into a list in the parameter map for the update called by the column name. The parameters are specified by a colon (:) prefixed to the name of a parameter. The parameter generation strategy can be overridden by creating a custom class implementing the org.springframework.integration.jdbc.SqlParameterSourceFactory interface by reference by the adapter attribute sql-parameter-source-factory. The Spring configuration file for an example using the JDBC inbound channel adapter is shown in Listing 11–42.

Listing 11–42. Spring Configuration for the JDBC Inbound Channel Adapter jdbc-inbound-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"

       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/jdbc
    http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd
    http://www.springframework.org/schema/jdbc
    http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.jdbc"/>

  <int:channel id="target"/>

  <int-jdbc:inbound-channel-adapter channel="target"
                                    data-source="dataSource"
                                    query="select * from t where status = 0"
                                    update="update t set status = 1 where id in (:id)">
    <int:poller fixed-rate="1000">
      <int:transactional/>
    </int:poller>
  </int-jdbc:inbound-channel-adapter>

  <jdbc:embedded-database id="dataSource">
    <jdbc:script location="/spring/jdbc/hsqldb.sql"/>
  </jdbc:embedded-database>

  <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>

</beans>

The example uses the embedded HyperSQL database and table discussed previously. Using the namespace support, the JDBC inbound channel adapter is configured to select all rows from table t where the status is 0, and then update the status to 1 so that the next poll will not pull the data again. The result set is then sent as a message payload to the message channel target.

The inbound channel adapter is set to poll at a fixed rate of 1,000 ms (once every second). Note the addition of the transactional element to the poller. This will cause the update and select queries to occur in the same transaction. The default transaction manager is configured using a Java configuration file, as shown in Listing 11–43. It is a common use case for the downstream channel to be a direct channel so that the endpoints are in the same thread and thus in the same transaction.

Listing 11–43. Java Configuration for the Database Transaction Manager

package com.apress.prospringintegration.jdbc;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;


import javax.sql.DataSource;

@Configuration
public class JdbcConfiguration {

    @Value("#{dataSource}")
    private DataSource dataSource;

    @Bean
    public DataSourceTransactionManager transactionManager() {
        DataSourceTransactionManager transactionManager =
                new DataSourceTransactionManager();
        transactionManager.setDataSource(dataSource);
        return transactionManager;
    }
}

As shown in Listing 11–44, a service activator class is added that will log all the data in the result set of the query. This service activator is subscribed to the target message channel.

Listing 11–44. JDBC Example Message Handler

package com.apress.prospringintegration.jdbc;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Component
public class JdbcMessageHandler {

    @ServiceActivator
    public void handleJdbcMessage(List<Map<String, Object>> message) {
        for(Map<String, Object> resultMap: message) {
            System.out.println("Row");
            for(String column: resultMap.keySet()) {
                System.out.println("column: " + column + " value: " +
                        resultMap.get(column));
            }
        }
    }
}

To run the JDBC inbound channel adapter example, a main class is created, as shown in Listing 11–45. This class will kick off the Spring Integration flow, and then block for enough time to allow us to see the adapter in action.

Listing 11–45. JDBC Inbound Channel Adapter Example main Class

package com.apress.prospringintegration.jdbc;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class JdbcInbound {

    public static void main(String[] args) throws Exception {
        ApplicationContext context =
                new ClassPathXmlApplicationContext("/spring/jdbc/jdbc-inbound-context.xml");
    }
}

The results of running the JDBC inbound channel adapter example are shown in Listing 11–46. Note that the query occurs before the update, so the status is equal to zero for allsrows.

Listing 11–46. Results of Running the JDBC Inbound Channel Adapter Example

Row
column: ID value: 1
column: FIRSTNAME value: Felix
column: LASTNAME value: the Cat
column: STATUS value: 0
Row
column: ID value: 2
column: FIRSTNAME value: Pink
column: LASTNAME value: Panther
column: STATUS value: 0

JDBC Outbound Channel Adapter

The JDBC outbound channel adapter supports handling an incoming message and using it to execute an SQL query—usually an update or insert statement. The message payload and header are accessible by default as input parameters. Using a Map as the incoming message payload, the values can be obtained using the expression :payload[<key>], where <key> is the Map key. The headers are also available using a similar expression, :headers[<key>]. As with the inbound adapter, the parameter generation can be overridden by using a custom SqlParameterSourceFactory. This adapter also requires a reference to either a JdbcTemplate or DataSource. If the input channel is a direct channel, then the outbound adapter will be in the same transaction as the sender.

The Spring configuration file for an example using the JDBC outbound channel adapter is shown in Listing 11–47. This example uses the previous inbound adapter to monitor any inserts to the database table. The JDBC outbound channel adapter is configured to listen to the message channel input. Based on the Map payload message, this adapter will perform an insert into table t.

Listing 11–47. Spring Configuration for the JDBC Outbound Channel Adapter jdbc-outbound-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context

    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/jdbc
    http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd
    http://www.springframework.org/schema/jdbc
    http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.jdbc"/>

  <int:channel id="input"/>

  <int:channel id="target"/>

  <int-jdbc:outbound-channel-adapter channel="input"
                                     query="insert into t (id, firstname, lastname, status)
                                            values(:payload[id], :payload[firstname],
                                            :payload[lastname], :payload[status])"
                                     data-source="dataSource"/>

  <int-jdbc:inbound-channel-adapter channel="target"
                                    data-source="dataSource"
                                    query="select * from t where status = 0"
                                    update="update t set status = 1 where id in (:id)">
    <int:poller fixed-rate="1000">
      <int:transactional/>
    </int:poller>
  </int-jdbc:inbound-channel-adapter>

  <jdbc:embedded-database id="dataSource">
    <jdbc:script location="/spring/jdbc/hsqldb.sql"/>
  </jdbc:embedded-database>

  <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>

</beans>

The main class for the JDBC outbound adapter is shown in Listing 11–48. This class creates a message with a Map payload containing the column values to insert. This will cause the outbound adapter to perform an insert into table t. The JDBC inbound adapter will then log the newly inserted row.

Listing 11–48. JDBC Outbound Channel Adapter Example main Class

package com.apress.prospringintegration.jdbc;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;

import java.util.HashMap;
import java.util.Map;


public class JdbcOutbound {
    public static void main(String[] args) throws Exception {
        ApplicationContext context =
                new ClassPathXmlApplicationContext(
                        "/spring/jdbc/jdbc-outbound-context.xml");

        MessageChannel input = context.getBean("input", MessageChannel.class);

        Map<String, Object> rowMessage = new HashMap<String, Object>();

        rowMessage.put("id", 3);
        rowMessage.put("firstname", "Mr");
        rowMessage.put("lastname", "Bill");
        rowMessage.put("status", 0);

        Message<Map<String, Object>> message =
                MessageBuilder.withPayload(rowMessage).build();
        input.send(message);

    }
}

The results of running the JDBC outbound channel adapter are shown in Listing 11–49. The first two rows are the preexisting data. The third row is from the insert done by the outbound adapter.

Listing 11–49. Results of Running the JDBC Outbound Channel Adapter Example

Row
column: ID value: 1
column: FIRSTNAME value: Felix
column: LASTNAME value: the Cat
column: STATUS value: 0
Row
column: ID value: 2
column: FIRSTNAME value: Pink
column: LASTNAME value: Panther
column: STATUS value: 0
Row
column: ID value: 3
column: FIRSTNAME value: Mr
column: LASTNAME value: Bill
column: STATUS value: 0

JDBC Outbound Gateway

The JDBC outbound gateway combines both an inbound and an outbound adapter. Similar to the JDBC outbound channel adapter, the gateway supports using the message payload and headers using the same conventions (i.e., :payload[<key>] and :headers[<key>]). As with the channel adapters, there is also an option to provide a custom SqlParameterSourceFactory for the request and reply. The gateway also requires a reference to a JdbcTemplate or DataSource.

There are three options for specifying what the reply message should contain:

  • By default, an insert statement will return the following message to the output channel with the number of rows affect as a Map: {UPDATED=1}.
  • Using the autogenerated keys option, the reply message will be populated with the generated key values. You can enabled this by setting the attribute keys-generated to true.
  • The last option is to provide a query to execute and populate the reply message with the result set of the query.

The Spring configuration file for a JDBC outbound gateway example is shown in Listing 11–50. This example uses the third option, where a row is inserted and the reply message is a query of the newly added row. The id of the insert row is passed to the query to select only the inserted row.

Listing 11–50. Spring Configuration for the JDBC Outbound Gateway jdbc-gateway-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/jdbc
    http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd
    http://www.springframework.org/schema/jdbc
    http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd">

  <context:component-scan base-package="com.apress.prospringintegration.jdbc"/>

  <int:channel id="input"/>

  <int:channel id="output">
    <int:queue capacity="10"/>
  </int:channel>

  <int-jdbc:outbound-gateway request-channel="input"
                             reply-channel="output"
                             update="insert into t (id, firstname, lastname, status)
                                    values(:payload[id], :payload[firstname],
                                    :payload[lastname], :payload[status])"
                             query="select * from t where id = :payload[id]"
                             data-source="dataSource"/>

  <jdbc:embedded-database id="dataSource">
    <jdbc:script location="/spring/jdbc/hsqldb.sql"/>
  </jdbc:embedded-database>

</beans>

The main class for running the JDBC outbound gateway example is shown in Listing 11–51. The main class is similar to the outbound channel adapter, with the exception of additional code to receive the reply message.

Listing 11–51. JDBC Outbound Gateway Example main Class

package com.apress.prospringintegration.jdbc;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.core.PollableChannel;
import org.springframework.integration.support.MessageBuilder;

import java.util.HashMap;
import java.util.Map;

public class JdbcGateway {
    public static void main(String[] args) throws Exception {
        ApplicationContext context =
                new ClassPathXmlApplicationContext("/spring/jdbc/jdbc-gateway-context.xml");

        MessageChannel input = context.getBean("input", MessageChannel.class);
        PollableChannel output = context.getBean("output", PollableChannel.class);

        Map<String, Object> rowMessage = new HashMap<String, Object>();

        rowMessage.put("id", 3);
        rowMessage.put("firstname", "Mr");
        rowMessage.put("lastname", "Bill");
        rowMessage.put("status", 0);

        Message<Map<String, Object>> message =
                MessageBuilder.withPayload(rowMessage).build();
        input.send(message);

        Message<?> reply = output.receive();

        System.out.println("Reply message: " + reply);

        Map<String, Object> rowMap = (Map<String, Object>) reply.getPayload();

        for (String column : rowMap.keySet()) {
            System.out.println("column: " + column + " value: " + rowMap.get(column));
        }

    }
}

The results of running the JDBC outbound gateway example are shown in Listing 11–52. The reply message is returned, as well as the JdbcMessageHandler class and the inbound channel adapter.

Listing 11–52. Results of Running the JDBC Outbound Gateway Example

Reply message: [Payload={ID=3, FIRSTNAME=Mr, LASTNAME=Bill,images
STATUS=0}][Headers={timestamp=1296538850203, id=9ff84cd9-38ec-4533-8f43-3faedb213d44}]
column: ID value: 3
column: FIRSTNAME value: Mr
column: LASTNAME value: Bill
column: STATUS value: 0

Summary

There are many means for communication and storage, including storing data as files, communication with other computers over the network using TCP and UDP, interacting with users using input and output streaming, transferring files using FTP/FTPS and SFTP, and storing and retrieving information held within a database. Spring Integration supports these different talk-to-the-metal endpoints with message sources and handlers for files, socket-level communication using TCP and UDP, input and output streaming, file transfer adapters, and JDBC adapters. These adapters are essential to integrating with the rest of the world.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.116.36.71