The flatMap, concatMap, and flatMapSequential operators

Of course, Project Reactor could not omit the implementation of the flatMap operator as it is a crucial transformation in functional programming itself.

The flatMap operator logically consists of  two operations—map and flatten (in terms of Reactor, flatten is similar to the merge operator). The map part of the flatMap operator transforms each incoming element into a Reactive Stream (T -> Flux<R>), and the flatten part merges all generated reactive sequences into a new reactive sequence, through which it passes elements of type R

The following marble diagram may help us grasp the idea:

Diagram 4.7. Operator: flatMap

In the preceding diagram, for each circle(n), we generate square(n) and then triangle(n). All such subsequences are merged into one downstream. 

Project Reactor provides a handful of different variants of the flatMap operator. Besides overrides, the library also gives the flatMapSequential operator and the concatMap operator. These three operators differ in a few dimensions, namely:

  • Whether the operator is eagerly subscribing to its inner streams (the flatMap and flatMapSequential operators subscribe eagerly, the concatMap waits for each inner completion before generating the next sub-stream and subscribing to it)
  • Whether the operator preserves the order of generated elements (the concatMap naturally preserves the same order as the source elements, the flatMapSequential operator preserves the order by queuing elements received out of order, while the flatMap operator does not necessarily preserve original ordering)
  • Whether the operator allows the interleaving of elements from different sub-streams (the flatMap operator allows interleaving, while concatMap and flatMapSequential do not)

Let's implement a simple algorithm that requests each user's favorite books. A service that provides a user's favorite books may look like the following:

public Flux<String> requestBooks(String user) {
return Flux.range(1, random.nextInt(3) + 1) // (1)
.map(i -> "book-" + i) // (2)
.delayElements(Duration.ofMillis(3)); // (3)

}

The mock service does the following:

  1. The service generates a random amount of integer values
  2. Then it maps each number to the book title
  3. The service delays each book by an amount of time, which should simulate a communication delay with a database

Now we may combine executions of the requestBooks method for a couple of users:

Flux.just("user-1", "user-2", "user-3")
.flatMap(u -> requestBooks(u)
.map(b -> u + "/" + b))
.subscribe(r -> log.info("onNext: {}", r));

The preceding code generates the following output, which proves the interleaving of elements:

[thread: parallel-3] onNext: user-3/book-1
[thread: parallel-1] onNext: user-1/book-1
[thread: parallel-1] onNext: user-2/book-1
[thread: parallel-4] onNext: user-3/book-2
[thread: parallel-5] onNext: user-2/book-2
[thread: parallel-6] onNext: user-1/book-2
[thread: parallel-7] onNext: user-3/book-3
[thread: parallel-8] onNext: user-2/book-3

Besides, we can see that the outgoing elements of the flatMap operator arrive at the subscriber handlers in different threads. However, the Reactive Streams specification guarantees the happens-before semantics. So, even when elements may arrive in different threads, none of them ever arrive concurrently. This aspect of Project Reactor is covered in detail in the Thread scheduling section.

Also, the library makes it possible to delay onError signals with the flatMapDelayError, flatMapSequentialDelayError, and concatMapDelayError operators. Besides this, the concatMapIterable operator allows a similar operation when the transformation function generates an iterator for each element instead of a Reactive Stream. In this case, interleaving cannot happen.

The flatMap operator (and its variants) is very important both in functional programming and in reactive programming as it allows the implementation of complex workflows with one line of code. 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.158.134