Reactive Spring Data in action

To finish this chapter and highlight the benefits of the reactive persistence, let's create a data-intensive reactive application that has to communicate with a database frequently. For example, let's revisit the example from Chapter 6WebFlux Async Non-Blocking Communication. There, we implemented an alternative read-only web frontend application for the Gitter service (https://gitter.im). The application connects to a predefined chat room and re-streams all the messages to all connected users through Server-Sent Events (SSE). Now, with new requirements, our application has to collect statistics about the most active and the most referenced users in the chat room. Our chat application may use MongoDB to store messages and user profiles. This information may also be used for statistic recalculation purposes. The following diagram depicts the application design:

Diagram 7.18 The chat application with reactive Spring Data for MongoDB access

In the preceding diagram, the numbered points are as follows:

  1. This is a Gitter server that may stream messages from the particular chat room through SSE. This is an external system from which our application receives all data.
  2. This is the UI Service, which is a part of our application. That component streams messages from Gitter and recent statistics to the clients, which are web applications running in browsers. The UI Service uses the WebFlux module for data streaming via SSE in a reactive fashion.
  3. This is the Chat Service, which uses a reactive WebClient to listen for incoming messages from the Gitter server. Received messages are broadcast to WebClient through the UI Service and also streamed to the Statistics Service.
  4. The Statistics Service continually tracks the most active and the most mentioned users. The statistics are constantly streamed to the web clients via the UI Service.
  5. The User Repository is a reactive repository that communicates with MongoDB to store and retrieve information about chat participants. It is built with the Spring Data MongoDB Reactive module.
  6. The Message Repository is a reactive repository that allows storing and searching for chat messages. It is also built with Spring Data MongoDB Reactive.
  7. We choose MongoDB to be our storage as it fits the application needs and also has a reactive driver and good reactive support in Spring Data.

So, the flow of data in the application is constant and does not require any blocking calls. Here, the Chat Service broadcasts messages through the UI Service, the Statistics Service receives chat messages, and after recalculating the statistic, it sends messages to the UI Service with statistics. The WebFlux module is responsible for all network communications, and Spring Data makes it possible to plug MongoDB interactions without breaking the reactive flows. Here, we omit most of the implementation details. However, let's look at the Statistics Service. This may look as follows:

public class StatisticService {
   private static final UserVM EMPTY_USER = new UserVM("", "");

   private final UserRepository userRepository;                   // (1)
   private final MessageRepository messageRepository;             //

   // Constructor...

   public Flux<UsersStatistic> updateStatistic(                  // (2)
      Flux<ChatMessage> messagesFlux                             // (2.1)
   ) {
      return messagesFlux
         .map(MessageMapper::toDomainUnit)                       // (2.2)
         .transform(messageRepository::saveAll)                  // (2.3)
         .retryBackoff(Long.MAX_VALUE, Duration.ofMillis(500))   // (2.4)
         .onBackpressureLatest()                                 // (2.5)
         .concatMap(e -> this.doGetUserStatistic(), 1)           // (2.6)
         .errorStrategyContinue((t, e) -> {});                   // (2.7)
   }

   private Mono<UsersStatistic> doGetUserStatistic() {           // (3)
      Mono<UserVM> topActiveUserMono = userRepository
         .findMostActive()                                       // (3.1)
         .map(UserMapper::toViewModelUnits)                      // (3.2)
         .defaultIfEmpty(EMPTY_USER);                            // (3.3)

      Mono<UserVM> topMentionedUserMono = userRepository
         .findMostPopular()                                      // (3.4)
         .map(UserMapper::toViewModelUnits)                      // (3.5)
         .defaultIfEmpty(EMPTY_USER);                            // (3.6)

      return Mono.zip(                                           // (3.7)
          topActiveUserMono,
          topMentionedUserMono,
          UsersStatistic::new
        ).timeout(Duration.ofSeconds(2));                        // (3.8)
    }
}

Let's analyze the implementation of the StatisticService class:

  1. The StatisticService class has references to the UserRepository and MessageRepository, which provide reactive communication with MongoDB collections.
  2. The updateStatistic method streams statistic events that are represented by the UsersStatistic view-model objects. At the same time, the method requires an incoming stream of chat messages represented by the messagesFlux method argument (2.1). The method subscribes to a Flux of ChatMessage objects, transforming them into the desired representation (2.2), and saving them to MongoDB with messageRepository (2.3). The retryBackoff operator helps to overcome potential MongoDB communication issues (2.4). Also, if a subscriber cannot handle all the events, we drop old messages (2.5). By applying the concatMap operator, we start the process of statistic revaluation by calling the doGetUserStatistic method inside (2.6).

We use concatMap for this, as it guarantees the correct order of statistic results. This is because the operator waits for the inner sub-stream to complete before generating the next sub-stream. Also, in the statistics recalculation, we ignore all errors by applying the errorStrategyContinue operator (2.7) because this part of the application is not critical and tolerates some temporary issues.

  1. The doGetUserStatistic helper method calculates the top users. To calculate the most active user, we call the findMostActive method on the userRepository (3.1), map the result to the correct type (3.2), and, in a case in which no user is found, we return the predefined EMPTY_USER (3.3). Similarly, to get the most popular user, we call the findMostPopular method on the repository (3.4), map the result (3.5), and set the default value if required (3.6). The Mono.zip operator helps to merge those two reactive requests and produce a new instance of the UsersStatistic class. The timeout operator sets the maximum time budget available for statistics recalculation.

With this elegant code, we have easily blended the incoming stream of messages originating from a WebClient object, the outgoing stream of SSE events handled by the WebFlux module. Naturally, we also included MongoDB query processing into our reactive pipeline through reactive Spring Data. Moreover, we have not blocked any threads anywhere in this pipeline. As a result, our application utilizes server resources very efficiently.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.31.165