Composing and transforming Reactive Streams

When we build complicated reactive workflows, it is often necessary to use the same sequence of operators in a couple of different places. With the transform operator, we can extract such common pieces into separate objects and reuse them whenever required. Previously, we've transformed events within a stream. With the transform operator, we can augment the stream structure itself. Let's assume the following example:

Function<Flux<String>, Flux<String>> logUserInfo =                 // (1)
stream -> stream //
.index() // (1.1)
.doOnNext(tp -> // (1.2)
log.info("[{}] User: {}", tp.getT1(), tp.getT2())) //
.map(Tuple2::getT2); // (1.3)

Flux.range(1000, 3) // (2)
.map(i -> "user-" + i) //
.transform(logUserInfo) // (3)
.subscribe(e -> log.info("onNext: {}", e));

Let's look at the preceding code:

  1. We define the logUserInfo function with the Function<Flux<String>, Flux<String>> signature. It transforms a Reactive Stream of String values into another Reactive Stream, which also generates String values. In this example, for each onNext signal, our function logs details about a user (1.2), additionally enumerating incoming events with the index operator (1.1). The outgoing stream does not contain any information about enumeration, because we remove it with the map(Tuple2::getT2) call (1.3).
  2. Here, we generate some user IDs.
  3. We embed the transformation defined by the logUserInfo function by applying the transform operator.

Let's execute the preceding code. The log output is the following:

[0] User: user-1000
onNext: user-1000
[1] User: user-1001
onNext: user-1001
[2] User: user-1002
onNext: user-1002

In the logs, we see that each element is logged both by the logUserInfo function and by the final subscription. However, the logUserInfo function also tracks indexes of the events.

The transform operator updates the stream behavior only once, at the assembly phase of a stream life-cycle. At the same time, Reactor has the compose operator, which does the same stream transformation each time a subscriber arrives. Let's illustrate its behavior with the following code:

Function<Flux<String>, Flux<String>> logUserInfo = (stream) -> {     // (1)
if (random.nextBoolean()) {
return stream
.doOnNext(e -> log.info("[path A] User: {}", e));
} else {
return stream
.doOnNext(e -> log.info("[path B] User: {}", e));
}
};

Flux<String> publisher = Flux.just("1", "2") // (2)
.compose(logUserInfo); // (3)

publisher.subscribe(); // (4)
publisher.subscribe();

In the preceding code, we do the following:

  1. Similar to the previous example, we define a transformation function. In this case, the function randomly chooses the path of a stream transformation each time. Two proposed paths differ only by the log message prefix.
  2. Here, we create a publisher that generates some data.
  3. With the compose operator, we embed the logUserInfo function into the execution workflow.
  4. Also, we subscribe a couple of times with the hope of observing different behaviors for different subscriptions.

Let's execute the preceding code, it should produce the following output:

[path B] User: 1
[path B] User: 2
[path A] User: 1
[path A] User: 2

The log messages prove that the first subscription triggered path B, while the second triggered path A. Of course, the compose operator allows the implementation of much more complicated business logic than a random selection of log message prefixes. Both the transform and compose operators are powerful tools that enable code reuse in reactive applications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.255.178