In the previous chapter, I showed you how Spring Cloud Stream Application Starters work as stand-alone apps that can deliver an enterprise-ready solution with ease. I showed you the Spring Cloud Stream model application and how the main core is based on Spring Integration and Spring Boot for its easy configuration. You saw how to use Spring Cloud Functions and Spring Integration to create streams very easily. I also showed you a feature (the best, in my opinion) called a binder that allows your solution to use any physical destination or any middleware messaging broker. This chapter covers how to create a custom binder.
You will use the NATS server (https://nats.io), a broker for building distributed applications to provide real-time streaming and big data use-cases (see Figure 7-1). Are wondering why did I chose NATS over other technologies? I used to work on Cloud Foundry (www.cloudfoundry.org) projects, and one of the main components to keep alive some of the VMs are to use NATS, a fast and reliable messaging broker, that is easy to use. As an experiment, I decided to create a prototype as a binder implementation. It took me a few hours. Yes, it is very easy to create a custom binder.
Binder
A binder uses the Service Provider Interface (SPI) pattern, which allows you to extend or add extra functionality to your system by enabling features or replacing components. This pattern has been around since the first iterations of the Java programming language and adds plugin functionality.
Spring Cloud Stream exposes several interfaces and abstract and utility classes; it also provides discovery strategies that allow you to plug in external middleware. These interfaces and classes help you create binders very easily. A typical scenario features a producer and consumer using a binder to produce and consume messages. The binder takes care of the connection, retries, sessions, or anything that allows the publisher and consumer to use the broker with knowing how it is done. It hides boilerplate coding and avoids the need to learn a specific API.
org.springframework.cloud.stream.binder.Binder Interface
binderConsumer. This method’s first parameter is the destination name that internally creates the necessary channels and whatever purpose object is needed in the broker, such as queue, topic, and so forth. The next parameter is a group in which the consumers accept messages (a worker style or publish/subscribe patterns). The third parameter is the destination/channel instance where the consumer listens/subscribes to new incoming messages. The fourth parameter is a broker (specific) and business properties that belongs to the message.
binderProducer. This method’s first parameter is the destination name that creates the necessary channels and whatever purpose object is needed in the broker, like a topic, exchange, and so forth. The next parameter is the destination/channel instance where the producer sends the messages. The last parameter is any property that contains broker-specific and business properties.
Implementing a Binder
A class must implement the Binder interface.
A @Configuration marked class defines a binder bean and the steps for creating the middleware broker infrastructure; it may be a connection, or session, some credentials, and so forth.
It is necessary to create a META-INF/spring.binders file that is found in the classpath and contains one or more binder definitions.
As you can see, it is very simple to implement a binder, so let’s start creating a custom binder using the NATS brokers.
NATS Binder
Creating a custom binder helps developers speed up development; as a binder developer, you need to know how this broker works.
Before you start implementing the binder, I think it is necessary to create a library that allows you to produce and consume messages so you can reuse it later. At the end of the chapter, you create a nats-binder project with three modules: nats-messaging (NATS client), nats-messaging-binder (NAT binder implementation), and nats-messaging-test (NATS binder test).
This command downloads a 10 MB image.
Project: nats-binder
nats-binder/pom.xml
Analyze pom.xml and the dependencies.
Next, create the modules, and because you are using Spring Initializr, you can uncompress the ZIP file in the nats-binder folder.
NATS Client: nats-messaging
Group: com.apress.nats
Artifact: nats-messaging
Package: com.apress.nats
Dependencies: Lombok
Next, let’s add the dependencies needed to use the NATS server. One of the benefits of using open source technology is that it is open to the community. This case requires the NAT Java client (https://nats.io/download/nats-io/nats.java/ and https://github.com/nats-io/nats.java).
nats-binder/nats-messaging/pom.xml
Take a look at pom.xml and note that the parent is declaring the nats-binder main project. Remember, the nats-messaging library is a module. Review it, and let’s continue.
src/main/java/com/apress/nats/NatsProperties.java
Listing 7-4 shows the NatsProperties class; as you can see, it is very simple and has default values. Remember that you can override these properties in the application.properties/yml file, command line or environment variables, and so forth.
src/main/java/com/apress/nats/NatsConnection.java
Listing 7-5 shows the NatsConnection class. This class has the NATS Connection instance. Here you are calling NatProperties to use the default values or those provided by the developer when using spring.nats.* properties. As you can see, the Nats class is static. You can call the connect method passing the schemed (nats://), the host, and the port, which is a very simple way to connect to the NATS server.
src/main/java/com/apress/nats/NatsTemplate.java
Listing 7-6 shows the NatsTemplate class. This class removes all the boilerplate and provides all the operations that deal with the NATS server. This is an implementation of the template design pattern; if you are using the Spring Framework, you can find several of these, including JmsTemplate, RabbitTemplate, KafkaTemplate, and JdbcTemplate.
You are declaring only two overload methods, where you always receive the subject (similar to a topic) and the message. You are using the org.springframework.messaging.Message interface. Also note that you need the NatsConnection instance. To publish a message, you use the connection (with the getConnection() method call) and invoke the publish method. In the send(String subject, Message<?> message) method, you use a Spring serialization utils to serialize your message into a byte array. The NATS protocol requires that messages are the byte[] type.
src/main/java/com/apress/nats/NatsMessageListener.java
Listing 7-7 shows the NatsMessageListener interface, which has the onMessage method with a byte[] type as a parameter.
Next, let’s create at least one implementation to delegate the listener. This class subscribes to the subject (the same a topic) in the NATS server.
src/main/java/com/apress/nats/NatsMessageListenerAdapter.java
Listing 7-8 shows the NatMessageListenerAdapter class that implements NatsMessageListener. Analyze this class before continuing. In the Java NATS client, there are two ways to get messages: synchronous and asynchronous. You are implementing the asynchronous way. To use it, you need to create a Dispatcher instance (based on the connection) and subscribe to the subject (the same as a topic). When you need to remove the subscription, you only need to call the unsubscribe method from the Dispatcher instance.
The code uses the @Log4j2 annotation from Lombok to inject the logging. Normally, you don’t use it like that. You need to use AOP to create your cross-cutting concern for logging.
src/main/java/com/apress/nats/NatsConfiguration.java
Listing 7-9 shows the NatsConfiguration class, which creates NatsConnection and NatsTemplate Spring beans. Note that you are using the @ConditionalOnMissingBean, which is useful when another class that uses this library creates its own beans with different implementations or values, so you avoid having several beans with the same type.
src/main/java/com/apress/nats/NatsProducerConsumer.java
Stop your app.
Before continuing, comment out all the code of the NatProducerConsumer.java class.
NATS Binder Implementation: nats-messaging-binder
Group: com.apress.nats
Artifact: nats-messaging-binder
Package: com.apress.nats
Dependencies: Lombok
nats-binder/nats-messaging-binder/pom.xml
Listing 7-11 shows the pom.xml file for the nats-messaging-binder module. Note that you are declaring the nats-messaging module as a dependency.
Next, let’s follow the steps to create a new binder
Implement the Binder Interface
If you review the Binder interface , you see that you need several classes before you can implement it (see Listing 7-1). One of the parameters that you need to pass is the inbound and outbound binding targets for the consumer and producer methods, respectively. You can create all the logic for that and follow the practices for creating the different types of channels, messaging support, message converters, and so forth, but that takes too long. What if you rely on some of the abstract implementations that already takes out the underlying infrastructure that needs to be done with channels and so forth.
org.springframeworl.cloud.stream.binder.AbstractMessageChannelBinder.java
src/main/java/com/apress/nats/NatsMessageBinderProvisioningProvider.java
src/main/java/com/apress/nats/NatsMessageBinderDestination.java
Listing 7-14 shows the NatsMessageBinderDestination that implements both the ProducerDestination and the ConsumerDestination interfaces. The ProducerDestination interface declares getName() and getNameForPartition, and the ConsumerDestination interface declares getName(). This creates the destination and all the wiring for the underlying channel and integration infrastructure. Note that you are not implementing the partition feature for now.
src/main/java/com/apress/nats/NatsMessageBinderProducer.java
Listing 7-15 shows the NatsMessageBinderProducer class that is extending the MessageProducerSupport class. The only methods that you override are doStart() and doStop(). Let the class business logic handle the rest. In this class, you need to set up the listener that is connected to the NATS server. Look at the constructor where you need a NatsConnection instance. You start listening when the underlying bootstrap calls the doStart() method. When you receive the message, use the sendMessage method that deserializes it into a Messager<?> type, which is a wrapper class containing headers and the payload.
src/main/java/com/apress/nats/NatsMessageBinder.java
Listing 7-16 shows the NatsMessageBinder class, our main binder implementation. Take a look at the constructor, where it is necessary to call the base class (AbstractMessageChannelBinder) passing the headers of the message (e.g., custom headers or the broker-related headers), the ProvisioningProvider (NatsMessageBinderProvisioningProvider class), and the NatsTemplate that sends the messages.
We override createProducerMessageHandler, which returns MessageHandler. It has the message to send to the NATS server. That’s why the NatsTemplate instance is used to get the destination name and the message. Also, we override createConsumerEndPoint, which returns an instance of the NatMessageBinderProducer class. Remember that this class starts with the listener for incoming messages.
Create the @Configuration Beans
src/main/java/com/apress/nats/NatsMessageBinderConfiguration.java
Listing 7-17 shows the NatsMessageBinderConfiguration class. This class is importing the NatsConfiguration class that contains NatsTemplate and NatsConnection (see Listing 7-9). Here we are defining the binder, the natsMessageBinderProvisioningProvider, and the natsMessageBinder that the Spring beans need to wire up everything for the binder to work. In the natsMessageBinder method, we return a new instance of the NatsMessageBinder class that has several parameters. For now, you are passing null to the headers. You can deal with them later.
Create the META-INF/spring.binders
src/main/resources/META-INF/spring.binders
Listing 7-18 shows the spring.binders file, which is required for the autoconfiguration to work. This means that if you add this module as a dependency, it uses spring.binders to find every class with the @Configuration annotated class and executes the autoconfiguration logic to set up the binder or any other configuration.
Note that you are naming this binder nats, which is important when using multiple binders in one stream, which is discussed in later sections.
NATS Binder Test
Now that you have the nats-messaging and the nats-messaging-binder modules, it’s time to test it. Of course, there are specialized test classes for that, but I want to show you how easy it is to use this binder and save the unit/integration testing for later.
Group: com.apress.nats
Artifact: nats-messaging-test
Package: com.apress.nats
nats-messaging-test/pom.xml
Listing 7-19 shows pom.xml. Note that you are only using nats-messaging-binder and no other dependency because nats-messaging-binder provides everything that you need, including the nats-messaging module.
src/main/java/com/apress/nats/NatsStream.java
Listing 7-20 shows the NatsStream class. Take a look at the @EnableBinding declaration. We are using a 2.x programming style with the Source and Sink interfaces. To produce messages, you use the Spring Integration @InboundChannelAdapter annotation adapter. The underlying implementation of this adapter contains a pollable logic that calls and executes the method every 10 seconds based on the fixedRated parameter from the @Poller annotation. Note that this adapter uses the output channel to send a message; in this case, a GenericMessage<> type (a String). If you are curious, this is the same logic used for the Time Source Application Starter. It uses @InboundChannelAdapter to send a message every T seconds.
The @StreamListener annotation marks a method that receives all the incoming messages from the input channel.
Next, let’s help the binder name the channels/destinations and connect them. If this isn’t done, input and output channels/destinations are created but won’t be connected.
src/main/resources/application.properties
Listing 7-21 shows application.properties. Note that we are using the 2.x programming model, and the naming convention is based on input/output channels/destinations. So, the input and output channels must have the same name to produce (source) and consume (sink); in this case, you are calling the channels/destinations movie. Note that you use the debug logging level to get insight into what is happening.
Also look at the beginning of the logs, where you have the debug information from the other classes. You should see the Creating Message Listener and Subscribed to: movie messages.
Congratulations! You have created a NATS server binder. Now you can use it everywhere you need to use NATS without worrying about any API.
Don’t forget to stop your NAT server.
You can find all the source code in the ch07/nats-binder folder in this book’s companion code.
Multiple Binders
It was fun to create a NATS binder, right? Now, let’s look at solving a particular requirement when you need more than one binder. So far, you are using either Rabbit or Kafka, but not together or maybe with multiple Rabbit brokers or a Rabbit for a source and a Kafka for a processor.
movie-file-source-nats
Group: com.apress.cloud.stream
Artifact: movie-file-source-nats
Package: com.apress.cloud.stream.movie
Dependencies: Lombok
src/main/java/com/apress/cloud/stream/movie/Movie.java
src/main/java/com/apress/cloud/stream/movie/MovieStreamProperties.java
src/main/java/com/apress/cloud/stream/movie/MovieStream.java
src/main/resources/application.properties
Note that you are renaming the output destination movie.
That’s is for this stream. I know that the code seems repetitive, but it helps you understand the concept better. How can you do this using reactive programming? You need to make a small change. First, you can return Publisher<Message<Movie>> in fileFlow, and instead of calling the get() method (to get the IntegrationFlow instance), use toReactivePublisher(). Second, you need to create a supplier. Remember that you need to subscribe to the publisher. You need to declare a Supplier method for that. And third, you need to use the spring.cloud.stream.bindings.[suplier-method-name]-out-0.destination property.
You can find all the source code in the ch07/multiple folder. You find commented out the reactive version.
movie-filter-processor-nats-rabbit
Group: com.apress.cloud.stream
Artifact: movie-filter-processor-nats-rabbit
Package: com.apress.cloud.stream.movie
Dependencies: Cloud Stream, Lombok
Press the Generate button to download a ZIP file. Uncompress it and import it into your favorite IDE. Note the package name.
src/main/java/com/apress/cloud/stream/movie/MovieStream.java
Here you are still using the version 2.x model in which you need to declare @EnableBinding with its type, in this case, the processor with the input and output processor. Also note that you are using the Spring Integration @Filter annotations that need to return a boolean depending on the evaluated expression. In this case, you are evaluating if the genre is DRAMA; if so, let it pass. Also note that the @Filter annotation requires two parameters: inputChannel and outputChannel; in this case, they are processor type members.
You can find the reactive version in the source code for this project.
src/main/resources/application.yaml
In this case, you are using YAML because it is more legible and understandable than the properties. Note that you are using the spring.cloud.stream.binding.input.binder set to nats (this name is from the META-INF/spring.binders from the nats-messaging-binder module). And you are setting spring.cloud.stream.binding.input.destination to movie. Set output.binder to rabbit and set output.destination to log.
If you enable the reactive part from the MovieStream class, you need to use the naming convention for bindings. You can find this code commented out in the application.yaml file.
movie-log-sink-rabbit
Group: com.apress.cloud.stream
Artifact: movie-log-sink-rabbit
Package: com.apress.cloud.stream.movie
Dependencies: Cloud Stream, Lombok
You can press the Generate button to download a ZIP file. Uncompress it and import it into your favorite IDE. Note the package name.
src/main/java/com/apress/cloud/stream/movie/MovieStream.java
src/main/resources/META-INF/spring/movie-log.xml
Listing 7-29 shows the legacy XML Spring Integration. I think that any legacy Spring system that still uses the XML approach can be modernized to Spring Boot very easily because you can reuse your XML and take advantage of the performance (this topic is for another time).
As you can see, you are using the json-to-object-transformer component (because the data from RabbitMQ is an application/JSON type) just to convert to an object and getting the toString() format logged into the console. Note that the transformer sets the input-channel attribute to input (the name of the binding) and the output-channel attribute to logger, which is the logging-channel-adapter component’s ID.
Running Them All Together
docker-compose.yml
Congrats! You used multiple brokers!
As you can see, it is very easy to add multiple binders of the same type.
Extra Configuration
Even though I discussed configurations from the previous chapter, there are other properties that are worth mentioning. Check out the org.springframework.cloud.stream.config.BindingServiceProperties class and the Javadoc for more information on these properties.
Summary
In this chapter, I showed you how to create a custom binder by following the three steps: implement from the Binder interface, add the configuration that creates the binder, and finally, add the binder configuration to the spring.binders file. This is used when the Spring Boot autoconfiguration starts due to the findings in the classpath.
You created a NATS binder and used it in the multiple binders. This chapter used different ways to create streams: using version 2.x, where you need to declare the @Binding annotation with its type (source, processor, or sink), using version 3.x, where you can use functional and reactive programming, or using some of the code from old Spring Integration annotations and legacy XML.
In the next chapter, I discuss Spring Cloud Data Flow and how Spring Cloud Stream and the binder technology fit into our solutions.