Assembly-time

The first part of the stream life-cycle is an assembly-time. As we may have noticed from the previous sections, Reactor provides us with a fluent API that allows the building of a complex flow of element processing. At first glance, the API offered by Reactor looks like a builder that composes selected operators in the flow. As we may remember, the Builder pattern is mutable and assumes that a terminal operation such as build executes the building of another object. In contrast to a common builder pattern, the Reactor API offers immutability. Consequently, each applied operator produces a new object. In reactive libraries, that process of building the execution flow is called assembling. In order to understand the assembling approach better, the following pseudocode demonstrates how flow's assembling may look if we do not have the Reactor builder API:

Flux<Integer> sourceFlux = new FluxArray(1, 20, 300, 4000);
Flux<String> mapFlux = new FluxMap(sourceFlux, String::valueOf);
Flux<String> filterFlux = new FluxFilter(mapFlux, s -> s.length() > 1)
...

The preceding code demonstrates how reactive code may look if we do not have the fluent builder API. It is clear that, under the hood, Fluxes are composed one into each other. After the assembling process, we get a chain of Publishers, where each new Publisher wraps the previous one. The following pseudocode demonstrates this:

FluxFilter(
FluxMap(
FluxArray(1, 2, 3, 40, 500, 6000)
)
)

The preceding code shows how the resulted Flux looks after applying a sequence of operators such as just -> map -> filter.

In the streams life cycle, that phase plays an important role since during the streams assembling we can replace operators one on to another by checking the type of the stream. For example, sequence of concatWith -> concatWith -> concatWith operators may be easily compacted to one concatenation. The following code show how it is done in Reactor:

public final Flux<T> concatWith(Publisher<? extends T> other) {
if (this instanceof FluxConcatArray) {
@SuppressWarnings({ "unchecked" })
FluxConcatArray<T> fluxConcatArray = (FluxConcatArray<T>) this;

return fluxConcatArray.concatAdditionalSourceLast(other);
}
return concat(this, other);
}

As we can see from the preceding code, if current Flux is instance FluxConcatArray, then, instead of creating FluxConcatArray(FluxConcatArray(FluxA, FluxB), FluxC) we create one FluxConcatArray(FluxA, FluxB, FluxC)  and improve the overall performance of the stream in that way.

In addition, at assemble-time we may provide some Hooks to the stream under assembling and enable some additional logging, tracing, metrics' collecting or other important additions that may be useful during debugging or streams monitoring.

To summarize the role of assemble-time phase of Reactive Streams life cycle, during that phase we may manipulate the stream construction and apply different techniques for optimizing, monitoring or better stream debugging which is an inevitable part of building Reactive Systems.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.217.198