Spring Cloud Streams as a bridge to Spring Ecosystem

All of the previously mentioned solutions are competitive with one other and have their advantages, such as wins in low-latency or better guarantees of message delivery or persistence.

Nonetheless, this book is about the reactive possibilities in the Spring Ecosystem, so it would be valuable to get an understanding of what Spring offers for a painless integration with message brokers.

One of the powerful ways of building robust message-driven systems using Spring Cloud is through Spring Cloud Streams. Spring Cloud Streams provides a simplified programming model for async cross-service messaging. In turn, the Spring Cloud Streams module is built on top of the Spring Integration and Spring Message modules, which are the fundamental abstractions over proper integration with external services and straightforward asynchronous messaging. Moreover, Spring Cloud Streams give the ability to build an elastic application without having to deal with over-complicated configurations and without needing an in-depth knowledge of a particular message broker.

Unfortunately, only a few message brokers have the appropriate support in Spring Framework. At the moment of writing, Spring Cloud Streams only provides integration with RabbitMQ and Apache Kafka.

To get an understanding of the fundamentals of building reactive systems with Spring Cloud Streams, we are going to upgrade the reactive chat application that we built in Chapter 7Reactive Database Access, to a new reactive Spring Cloud Stack.

First of all, we are going to start by recapping on the design of our application, which consists of three conceptual parts. The first part is a connector service called ChatService. In our case, this is the implementation of communication with the Gitter service, which is a server-sent events stream. In turn, that stream of messages is shared between ChatControllerwhich is responsible for transferring those messages directly to final users, and StatisticServicewhich is responsible for storing messages in the database and recalculating statistics based on changes. Previously, all three parts consisted of one monolith application. Consequently, each component in the system was connected through the use of a Spring Framework Dependency Injection. Moreover, asynchronous, non-blocking messaging was supported with Reactor 3 reactive types. The first thing that we need to understand is whether Spring Cloud Streams allows decomposing a monolith application into microservices while allowing to use reactive types for communication between components.

Fortunately, starting with Spring Cloud 2, there is direct support for communication via Reactor types. Previously, location transparency might relate to the loose-coupling of components within the monolith application. Using the Inversion of Control (IoC), each component can access the component interface without any knowledge of the implementation. In the Cloud Ecosystem, along with knowing access interface, we should know the domain name (component name) or, in our case, the name of the dedicated queue. As a replacement for communication over interfaces, Spring Cloud Streams provide two conceptual annotations for wiring communication between services.

To learn more about Location Transparency, please see the following link: http://wiki.c2.com/?LocationTransparency.

The first conceptual annotation is the @Output annotation. This annotation defines the queue name to which the message should be delivered. The second conceptual annotation is the @Input annotation which defines the queue from which messages should listen. This method of interaction between services might replace our interfaces, so instead of invocating the method, we may rely on sending messages to the particular queue. Let's consider the changes that we have to apply to our application in order to allow sending messages to the message broker:

@SpringBootApplication                                             // (1)
@EnableBinding(Source.class) // (1.1)
@EnableConfigurationProperties(...) //
/* @Service */ // (1.2)
public class GitterService //
implements ChatService<MessageResponse> { //

... // (2)

@StreamEmitter // (3)
@Output(Source.OUTPUT) // (3.1)
public Flux<MessageResponse> getMessagesStream() { ... } //

@StreamEmitter // (4)
@Output(Source.OUTPUT) //
public Flux<MessageResponse> getLatestMessages() { ... } //

public static void main(String[] args) { // (5)
SpringApplication.run(GitterService.class, args); //
} //
} //
Note that, along with the actual implementation, the preceding code shows the code differences. Here, /* Commented Text */ refers to removed code-lines, and Bold Underlined Text means a new one. A non-styled code means that nothing has changed there.

In the preceding code, the numbered points mean the following:

  1. This is the SpringBootApplication declaration. At point (1.1)we define Spring Cloud Streams as an @EnableBinding annotation that enables underlying integration with the streaming infrastructure (for instance, integration with Apache Kafka). In turn, we removed the @Service annotation (1.2), since we migrated from the monolith application to a distributed one. Now we can run that component as a small independent application and enable better scaling in that way.
  2. This is the list of fields and constructors, which are left unchanged.
  3. This refers to the messages' Flux declaration. That method returns an infinitive stream of messages from Gitter. Here, the key-role plays @StreamEmitter because it establishes that the given source is a reactive source. Consequently, to define a destination channel, @Output is used here, accepting the name of the channel. Note that the destination channel's name must be in the list of the bound channel at line (1.1).
  1. Here, getLatestMessages returns a finite stream of the latest Gitter messages and sends them to the destination channel.
  2. This refers to the main method declaration, which is used to bootstrap a Spring Boot application.

As may be noticed from the example, there have been no significant changes made from the business logic perspective. In turn, a great deal of infrastructural logic was added here just by applying a few Spring Cloud Streams annotations. First of all, by using @SpringBootApplication, we defined our small service as a separate Spring Boot application. By applying @Output(Source.OUTPUT), we defined the name of the destination queue in a message broker.

Finally, @EnableBinding, @StreamEmitter means that our application is bound to a message broker and the getMessagesStream() and getLatestMessages() methods are invoked at the application's start point.

Тo learn more about @StreamEmitter and its restrictions, please see the following link: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_reactive_sources. Also, to get a basic understanding of Spring Cloud Stream's annotation model, please see this link: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_programming_model.

In addition to Java annotations, we should provide the configuration of Spring Cloud Stream's bindings. This may be done by providing Spring Application properties such as the following (application.yaml, in that case):

spring.cloud.stream:                                               //
bindings: //
output: // (1)
destination: Messages // (2)
producer: // (3)
requiredGroups: statistic, ui // (4)

In the previous example code, at point (1), we specified the bindings' key, which is the name of the channel defined in Source.OUTPUT. By doing so, we may access org.springframework.cloud.stream.config.BindingProperties and configure the name of the destination in the message broker (2). Along with that, we may configure how our producer should behave (3). For example, we may configure a list of recipients that should receive messages with an at least once delivery guarantee (4).

By running the previous code as a separate application, we may see that the dedicated queue inside the message broker starts receiving messages. On the other hand, as we might remember from Chapter 7Reactive Database Access, our chat application has two central message consumers: a controller layer and a statistic service. As the second step in the modification of the system, we are going to update the statistic service. In our application, the statistic service is a bit more than a plain consumer; it is responsible for updating statistics based on database changes and then sending it to the controller layer. This means that that service is Processor, since it plays the role of Source and Sink at the same time. Hence, we have to provide an ability to consume messages from the message broker as well as sending it to the message broker. Consider the following code:

@SpringBootApplication                                             // (1)
@EnableBinding(Processor.class) //
/* @Service */ //
public class DefaultStatisticService //
implements StatisticService { //

... // (2)

@StreamListener // (3)
@Output(Processor.OUTPUT) //
public Flux<UsersStatisticVM> updateStatistic( //
@Input(Processor.INPUT) Flux<MessageResponse> messagesFlux // (3.1)
) { ... } //

... // (2)

public static void main(String[] args) { // (4)
SpringApplication.run(DefaultStatisticService.class, args); //
} //
}

Each section of the numbered code is explained as follows:

  1. This is the @SpringBootApplication declaration. Here, as in the previous example, we replaced @Service with the @EnableBinding annotation. In contrast to the configuration of the GitterService component, we use the Processor interface which declares that the StatisticService component consumes data from the message broker as well as sends them to the message broker.
  2. This is the part of the code that is left unchanged.
  1. This is the processor's method declaration. Here, the updateStatistic method accepts a Flux, which provides access to the incoming message from the message broker's channel. We have to explicitly define that the given method listens to the message broker by providing an @StreamListener annotation along with the @Input annotation declaration.
  2. This is the main method declaration, which is used to bootstrap the Spring Boot application.

As may be noticed, we use Spring Cloud Streams annotations just to mark that the input Flux and output Flux are streams from/to the defined queues. In that example, @StreamListener allows the name of the virtual queue (from/to which messages are consumed/sent) to correspond to the name defined in the @Input/@Output annotations while pre-configured interfaces are bound in the @EnableBinding annotation. As seen in the preceding example, along with producers configuration, we may configure the declared input and output in the same generic way using the same application's properties (YAML configuration, in this case):

spring.cloud.stream:                                               //
bindings: //
input: // (1)
destination: Messages //
group: statistic // (2)
output: //
producer: //
requiredGroups: ui //
destination: Statistic //

Spring Cloud Stream provides flexibility in the configuration of communication with the message broker. Here, at point (1) we define input which is, in fact, a consumer's configuration. Additionally (2), we have to define the name of the group which represents the name of the group of recipients in the message broker.

To learn more about the available configurations for Spring Cloud Stream, please visit the following link https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_configuration_options.

Finally, after the preparation of emitters we have to update our InfoResource component in the following way:

@RestController                                                    // (1)
@RequestMapping("/api/v1/info") //
@EnableBinding({MessagesSource.class, StatisticSource.class}) // (1.1)
@SpringBootApplication //
public class InfoResource { //

... // (2)

/* public InfoResource( // (3)
ChatService<MessageResponse> chatService, //
StatisticService statisticService //
) { */ // (3.1)
@StreamListener //
public void listen( //
@Input(MessagesSource.INPUT) Flux<MessageResponse> messages, //
@Input(StatisticSource.INPUT)
//
Flux<UsersStatisticVM> statistic
//
) {
//

/* Flux.mergeSequential( // (4)
chatService.getMessagesAfter("") //
.flatMapIterable(Function.identity()), //
chatService.getMessagesStream() //
) //
.publish(flux -> Flux.merge( ... */ //

messages.map(MessageMapper::toViewModelUnit) // (5)
.subscribeWith(messagesStream); //
statistic.subscribeWith(statisticStream); //

/* )) // (4)
.subscribe(); */ //
} //

... // (2)

public static void main(String[] args) { // (6)
SpringApplication.run(InfoResource.class, args); //
} //
} //

In the preceding code, the numbered points mean the following:

  1. This is the @SpringBootAppliction definition. As we may have noticed, @EnableBinding accepts two custom bindable interfaces here with the separate input channel's configurations for statistics and messages.
  2. This is the part of the code that is left unchanged.
  3. This is the .listen method declaration. As we can see, the constructor that accepted two interfaces now accepts Fluxes annotated by the @Input annotation.
  4. This is the modified logic. Here, we do not need a manual stream merging and sharing anymore since we have moved that responsibility to the message broker.
  5. This is the point in which we subscribe to the given streams of statistics and messages. At that point, all incoming messages are cached into ReplayProcessor. Note that the mentioned cache is local and to achieve better scalability, the distributed one may be employed instead.
  6. This is the main method declaration, which is used to bootstrap the Spring Boot application.
To learn more about possible integration with distributed caches such as Hazelcast, please see the available extension in the Reactor-Addons module: https://github.com/reactor/reactor-addons/tree/master/reactor-extra/src/main/java/reactor/cache.

Here, we are listening to two separate queues. Again, employing a message broker gives us transparent decoupling from both GitterService and StatisticService. In turn, when we deal with Spring Cloud Stream, we have to remember that the @StreamListener annotation is only applicable to methods. Consequently, we have to hack that element by applying @StreamListener on top of the void method, which is invoked when a connection with the message broker has been wired.

To get a better understanding of the internals of custom bindable interfaces, let's consider the following code:

interface MessagesSource {                                         //
String INPUT = "messages"; // (1)
//
@Input(INPUT) // (2)
SubscribableChannel input(); // (3)
} //

interface StatisticSource { //
String INPUT = "statistic"; // (1)
//
@Input(INPUT) // (2)
SubscribableChannel input(); // (3)
} //

Each section of the numbered code is explained here:

  1. This is the String constant that represents the name of the bound channel.
  2. This is the @Input annotation that declares that the annotated method provides MessageChannel, through which incoming messages enter the application.
  3. This is the method that indicates the type of MessageChannel. In the case of the bindable interface for the message consumer, we have to provide SubscribableChannel, which extends MessageChannel with two additional methods for asynchronous message listening.

Identically to the previous cases, we have to provide similar properties in the local application.yaml:

spring.cloud.stream:                                               
bindings:
statistic:
destination: Statistic
group: ui
messages:
destination: Messages
group: ui

By including all of the puzzles in the diagram, we get the following architecture of the system:

Diagram 8.9. Example of a distributed chat application

In the preceding diagram, the numbered points mean the following:

  1. This is the representation of GitterService. As it may be noticed, GitterService is tightly coupled with Gitter API and Message Broker (2) (Apache Kafka in this case). However, there is no direct dependency on the external services.
  2. This is the message broker representation. Here, we have two virtual queues inside. Note that the mentioned representation does not expose specific configurations such as replication and partitioning.
  3. At this point, the message broker demultiplexes messages to the UI Service (InfoResource) and StatisticService.
  4. This is the StatisticService representation. As it might be noticed, the service listens to incoming messages for the message broker, stores them in MongoDB, does some statistic aggregations, and produces an update result.
  5. Finally, both queues are consumed by the UI Service, which demultiplexes all messages in turn to all subscribed clients.
  6. This is the web-browser representation. Here, the clients of the UI Service are web-browsers, which receive all updates through the HTTP connection.

As we may have noticed from the preceding diagram, our application is fully decoupled at the component level. For example, GitterService, StatisticService, and the UI service run as a separate application (which may be run on separate machines) and send messages to the message broker. In addition, the Spring Cloud Streams module supports Project Reactor and its programming model which means it follows the Reactive Streams specification and enables backpressure control, hence, offers higher application's resilience. With such a setup, each service can scale independently. Therefore, we may achieve a reactive system, which is the primary goal of moving to Spring Cloud Streams.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.189.199