WebFlux in action

In order to learn how we can use WebFlux in real scenarios, we are going to build a simple web application that connects to the remote Gitter Streams API using WebClient, transforms data using the Project Reactor API, and then broadcasts the transformed messages to the world using SSE. The following diagram shows a schematic representation of the system:

Diagram 6.26. The schematic design of the streaming application

The preceding diagram can be described as follows:

  1. This is the point of integration with the Gitter API. As we can see from the preceding diagram, the communication between our server and Gitter is a streaming one. Hence, reactive programming fits naturally there.
  2. This is the point in the system where we need to process incoming messages and transform them into a different view.
  3. This is the point where we are caching received messages and broadcasting them to each connected client.
  4. This is a representation of the connected browsers.

As we can see, the system has four central components in it. In order to build this system, we are going to create the following classes and interfaces:

  • ChatServeice: This is the interface responsible for wiring communication with a remote server. It provides the ability to listen to messages from that server.
  • GitterService: This is the implementation of the ChatService interface that connects to the Gitter streaming API in order to listen to new messages.
  • InfoResource: This is the handler class that handles user requests and responds with a stream of messages.

The first step toward implementation of the system is to analyze the ChatService interface. The following sample shows the required method:

interface ChatService<T> {

Flux<T> getMessagesStream();

Mono<List<T>> getMessagesAfter(String messageId);
}

The preceding example interface covers the minimal required functionality related to message reading and listening. Here, the getMessagesStream method returns an infinite stream of new messages in the chat, whereas getMessagesAfter allows us to retrieve a list of messages with a particular message ID.

In both cases, Gitter provides access to its messages over HTTP. That means that we can use a plain WebClient. The following is an example of how we can implement getMessagesAfter and access the remote server:

Mono<List<MessageResponse>> getMessagesAfter(                      //
String messageId //
) { //
... //
return webClient // (1)
.get() // (2)
.uri(...) // (3)
.retrieve() // (4)
.bodyToMono( // (5)
new ParameterizedTypeReference<List<MessageResponse>>() {}//
) //
.timeout(Duration.ofSeconds(1)) // (6)
.retryBackoff(Long.MAX_VALUE, Duration.ofMillis(500)); // (7)
}

The preceding code sample shows how we can organize plain request-response interaction with the Gitter service. Here, at point (1), we use a WebClient instance in order to execute a GET HTTP method call (2) to the remote Gitter server (3). We then retrieve the information at point (4) and convert it using the WebClient DSL to Mono of the List of the MessageResponse at point (5). Then, in order to provide resilience in communication with the external service, we provide a timeout for the call at point (6) and in case of error, retry the call at point (7).

Communicating with the streaming Gitter API is as simple as that. The following shows how we can connect to the JSON streaming (application/stream+jsonendpoint of the Gitter server:

public Flux<MessageResponse> getMessagesStream() {                 //
return webClient //
.get() // (1)
.uri(...) //
.retrieve() //
.bodyToFlux(MessageResponse.class) // (2)
.retryBackoff(Long.MAX_VALUE, Duration.ofMillis(500)); //
} //

As we can see in the preceding code, we use the same API as previously, as shown at point (1). The only changes that we have made are in the URI, which is hidden, and the fact that we are mapping to Flux, instead of Mono, as shown at point (2). Under the hood, WebClient uses the Decoder available in the container. If we have an infinite stream, this allows us to convert the elements on the fly, without waiting for the end of the stream.

Finally, in order to combine both streams into one and cache them, we may implement the following code, which provides an implementation for the InfoResource handler:

@RestController                                                    // (1)
@RequestMapping("/api/v1/info") //
public class InfoResource { //

final ReplayProcessor<MessageVM> messagesStream // (2)
= ReplayProcessor.create(50); //

public InfoResource( // (3)
ChatService<MessageResponse> chatService //
) { //
Flux.mergeSequential( // (3.1)
chatService.getMessageAfter(null) // (3.2)
.flatMapIterable(Function.identity()) //
chatService.getMessagesStream() // (3.3)
) //
.map(...) // (3.4)
.subscribe(messagesStream); // (3.5)
}

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) // (4)
public Flux<MessageResponse> stream() { //
return messagesStream; // (4.1)
} //
} //

The preceding code can be explained as follows:

  1. This is the declaration of the class annotated with @RestController.
  2. This is the ReplayProcessor field declaration. As we may remember from Chapter 4Project Reactor - the Foundation for Reactive Apps, ReplayProcessor allows us to cache a predefined number of elements and replay the latest elements to each new subscriber.
  1. Here, we have a declaration of the constructor of the InfoResource class. Within the constructor, we build a processing flow, which merges the stream of the latest messages from Gitter (shown at points 3.1 and 3.2). In the case of a null ID, Gitter returns the 30 latest messages. The processing flow also listens to the stream of new messages in near real-time, as shown at point (3.3). Then, all messages are mapped to the view-model, as shown at point (3.4), and the stream is immediately subscribed to by the ReplayProcessor. This means that once the InfoResource bean has been constructed, we connect to the Gitter service, cache the latest messages, and start listening for updates. Note that mergeSequential subscribes to both streams simultaneously, but starts sending messages from the second, but only when the first stream has been completed. Since the first stream is a finite one, we receive the latest messages and start sending the queued messages from the getMessagesStream Flux.
  2. This is a handler method declaration, which is invoked on each new connection to the specified endpoint. Here, we may just return the ReplayProcessor instance, shown at point (4.1), so it will share the latest cached messages and send new messages once they are available.

As we can see in the preceding example, providing complex functionality such as merging streams in the proper order, or caching the latest 50 messages and dynamically broadcasting them to all subscribers, does not require a lot of effort or written code. Reactor and WebFlux cover the hardest parts and allow us just to write business logic. This enables efficient non-blocking interaction with I/O. Therefore, we may achieve a high-throughput and low-latency system by using this powerful toolkit.

 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.103.204