Combining repository operations

Now, let's implement a slightly more complicated business use case. We may want to update a publication year for a book, and we only have the book's title. So, first, we have to find the desired book instance, then update the year of publishing, and save the book to the database. To make our use case even more complicated, let's assume that both the title and year values are retrieved asynchronously with some delay and are delivered by the Mono type. Also, we want to know whether our updated request was successful or not. So far, we do not require the update to be atomic and assume that there is always no more than one book with the same title. So, with those requirements we may design the following business method API:

public Mono<Book> updatedBookYearByTitle(                            // (1)
                    Mono<String> title,                              // (2)
                    Mono<Integer> newPublishingYear)                 // (3)

The preceding code does the following:

  1. The updatedBookYearByTitle method returns an updated book entity (or nothing, if no book is found).
  2. The title value is referenced via the Mono<String> type.
  3. The new publishing year value is referenced via the Mono<Integer> type.

We may now create a testing scenario to check how our implementation of the updatedBookYearByTitle works:

Instant start = now();                                             // (1)
Mono<String> title = Mono.delay(Duration.ofSeconds(1))             // (2)
   .thenReturn("Artemis")                                          //
   .doOnSubscribe(s -> log.info("Subscribed for title"))           //
   .doOnNext(t ->                                                  //
      log.info("Book title resolved: {}" , t));                    // (2.1)

Mono<Integer> publishingYear = Mono.delay(Duration.ofSeconds(2))   // (3)
   .thenReturn(2017)                                               //
   .doOnSubscribe(s -> log.info("Subscribed for publishing year")) //
   .doOnNext(t ->                                                  //
      log.info("New publishing year resolved: {}" , t));           // (3.1)

updatedBookYearByTitle(title, publishingYear)                      // (4)
   .doOnNext(b ->
      log.info("Publishing year updated for book: {}", b))         // (4.1)
   .hasElement()                                                   // (4.2)
   .doOnSuccess(status ->
      log.info("Updated finished {}, took: {}",                    // (5)
           status ? "successfully" : "unsuccessfully",
           between(start, now())))                                 // (5.1)
   .subscribe();                                                   // (6)

The preceding code does the following:

  1. Tracks running time, storing the start time of the test.
  2. Resolves the title with a simulated delay of one second, logging it as soon as the value is ready (2.1).
  3. Resolves the new publishing year value with a simulated delay of two seconds, logging it as soon as the value is ready (2.1).
  4. Calls our business method, logging it when the update notification arrives, if any (4.1). To check the presence of onNext events (meaning the actual book update), the Mono.hasElement method that returns Mono<Boolean> is called.
  1. When the stream completes, the code logs whether an update was successful and reports the total execution time.
  2. As always, someone has to subscribe in order to start the reactive workflow.

From the previous code, we may conclude that the workflow cannot run faster than two seconds, as this is the time required to resolve the publishing year. However, it may run for longer. Let's do the first iteration of the implementation:

private Mono<Book> updatedBookYearByTitle(            /* First Iteration */
   Mono<String> title,
   Mono<Integer> newPublishingYear
) {
   return rxBookRepository.findOneByTitle(title)                     // (1)
      .flatMap(book -> newPublishingYear                             // (2)
         .flatMap(year -> {                                          // (3)
            book.setPublishingYear(year);                            // (4)
            return rxBookRepository.save(book);                      // (5)
         }));
   }

With this approach, right at the start of the method, we call the repository with the provided reactive reference to the title (1). As soon as the Book entity is found (2), we subscribe to the new publishing year value. Then, as soon as the new publishing year value arrives, we update the Book entity (4) and call the save method for the repository. This code produces the following output:

Subscribed for title
Book title resolved: Artemis
Subscribed for publishing year
New publishing year resolved: 2017
Publishing year updated for book: Book(publishingYear=2017...
Updated finished successfully, took: PT3.027S

So, the book was updated, but as we can see from the logs, we subscribed to the new publishing year only after receiving the title, so in total the method spent more than three seconds calculating the result. We can do better. We have to subscribe to both streams at the beginning of the workflow in order to start concurrent retrieval processes. The following code depicts how to do this using the zip method:

private Mono<Book> updatedBookYearByTitle(           /* Second Iteration */
   Mono<String> title,
   Mono<Integer> newPublishingYear
) {
   return Mono.zip(title, newPublishingYear)                       // (1)
      .flatMap((Tuple2<String, Integer> data) -> {                 // (2)
         String titleVal = data.getT1();                           // (2.1)
         Integer yearVal = data.getT2();                           // (2.2)
         return rxBookRepository
            .findOneByTitle(Mono.just(titleVal))                   // (3)
            .flatMap(book -> {
               book.setPublishingYear(yearVal);                    // (3.1)
               return rxBookRepository.save(book);                 // (3.2)
            });
      });
}

Here, we zip two values and subscribe to them at the same time (1). As soon as both values are ready, our stream receives a Tuple2<String, Integer> container with the values of interest (2). Now we have to unpack values (2.1) and (2.2) using the data.getT1() and data.getT2() calls. At point (3), we query the Book entity, and as soon as it arrives, we update the publishing year and save the entity to the database. After the second iteration, our application shows the following output:

Subscribed for title
Subscribed for publishing year
Book title resolved: Artemis
New publishing year resolved: 2017
Publishing year updated for the book: Book(publishingYear=2017...
Updated finished successfully, took: PT2.032S

Now, we may see that we subscribe to both streams at first, and as both values arrive, we update the book entity. In the second approach, we spend roughly two seconds executing the operation instead of three seconds. It is faster but requires working with the Tuple2 type, which requires extra lines of code as well as making transformations. To improve readability and to remove the getT1() and getT2() calls we may add the Reactor Addons module, which provides some syntactic sugar for such cases.

With the following new dependency, we can improve the previous code sample:

compile('io.projectreactor.addons:reactor-extra')

This is how we can improve it:

private Mono<Book> updatedBookYearByTitle(            /* Third Iteration */
   Mono<String> title,
   Mono<Integer> newPublishingYear
) {
   return Mono.zip(title, newPublishingYear)
      .flatMap(
         TupleUtils.function((titleValue, yearValue) ->              // (1)
            rxBookRepository
               .findOneByTitle(Mono.just(titleValue))                // (2)
               .flatMap(book -> {
                  book.setPublishingYear(yearValue);
                  return rxBookRepository.save(book);
               })));
}

Here, at point (1), we may replace the manual deconstruction of the Tuple2 object with the function method from the TupleUtils class and work with already deconstructed values. Due to the fact that the function method is static, the resulting code is pretty fluent and verbose:

return Mono.zip(title, newPublishingYear)
    .flatMap(function((titleValue, yearValue) -> { ... }));

Furthermore, at point (2), we take titleValue and wrap it again into the Mono object. We could use the original title object that already has the correct type, but in that case, we would subscribe twice for the title stream and would receive the following output. Note that we trigger the title resolving code twice:

Subscribed for title
Subscribed for publishing year
Book title resolved: Artemis
New publishing year resolved: 2017
Subscribed for title
Book title resolved: Artemis
Publishing year updated for the book: Book(publishingYear=2017...
Updated finished successfully, took: PT3.029S

One more point is that in the third iteration we issue a database request to load the book only after receiving both title and new publishing values. However, we might start loading the book entity when the publishing year request is still in flight but the title value is already present. The fourth iteration shows how to build this reactive workflow:

private Mono<Book> updatedBookYearByTitle(            /* Forth Iteration */
   Mono<String> title,
   Mono<Integer> newPublishingYear
) {
   return Mono.zip(                                                // (1)
      newPublishingYear,                                           // (1.1)
      rxBookRepository.findOneByTitle(title)                       // (1.2)
   ).flatMap(function((yearValue, bookValue) -> {                  // (2)
      bookValue.setPublishingYear(yearValue);                      //
      return rxBookRepository.save(bookValue);                     // (2.1)
   }));
}

Here, with the zip operator (1) we subscribe to the new publishing year values (1.1) and the book entity (1.2) at the same time. When both values arrive (2), we update the publishing year of the entity and request the book saving procedure (2.1). Moreover, as in all previous iterations of this business use case, even though the workflow takes at least two seconds to complete, no threads are ever blocked. Consequently, this code uses computation resources very efficiently.

The point of this exercise is to demonstrate that with Reactive Streams' capabilities and the versatility of the Project Reactor API, it is easy to build different asynchronous workflows involving even the data persistence layer. With just a few reactive operators, we may completely change the way data flows through the system. However, not all such alternatives of reactive flows are equal. Some may run faster while others may run slower, and in many cases, the most obvious solution is not the most appropriate one. So, when writing reactive pipelines, please consider alternative combinations of reactive operators and do not choose the first that comes to mind but the option that is best for the business request.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.81.201