Runtime

The final step of the stream execution is a runtime phase. During that phase, we have an actual signals' exchange between a Publisher and a Subscriber. As we may remember from the Reactive Streams specification, the first two signals that Publisher and Subscriber exchange is the onSubscribe signal and the request signal. The onSubscribe method is called by the top source, which in our case is ArrayPublisher. This passes its Subscription to the given Subscriber. The pseudocode that describes the process of passing Subscription through ever Subscribers looks like the following:

MapSubscriber(FilterSubscriber(Subscriber)).onSubscribe(
new ArraySubscription()
) {
FilterSubscriber(Subscriber).onSubscribe(
new MapSubscription(ArraySubscription(...))
) {
Subscriber.onSubscribe(
FilterSubscription(MapSubscription(ArraySubscription(...)))
) {
// request data here
}
}
}

Once the Subscription has passed through all the Subscribers chain and each Subscriber in the chain wrapped given Subscription into specific representation. So finally we get the pyramid of Subscription wrappers as it shown in the following code:

FilterSubscription(
MapSubscription(
ArraySubscription()
)
)

Finally, the last Subscriber receives the chain of Subscriptions and in order to start receiving elements should call the Subscription#request method which initiates the sending of elements. The following pseudocode demonstrates what the process of requesting elements looks like:

FilterSubscription(MapSubscription(ArraySubscription(...)))
.request(10) {
MapSubscription(ArraySubscription(...))
.request(10) {
ArraySubscription(...)
.request(10) {
// start sending data
}
}
}

Once all of the Subscribers pass the requested demand and ArraySubscription receives it, the ArrayFlux can start sending elements to the MapSubscriber(FilterSubscriber(Subscriber)) chain. The following is pseudocode that describes the process of sending elements through all the Subscribers:

...
ArraySubscription.request(10) {
MapSubscriber(FilterSubscriber(Subscriber)).onNext(1) {
// apply mapper here
FilterSubscriber(Subscriber).onNext("1") {
// filter
// element does not match
// request and additional element then
MapSubscription(ArraySubscription(...)).request(1) {...}
}
}
MapSubscriber(FilterSubscriber(Subscriber)).onNext(20) {
// apply mapper here
FilterSubscriber(Subscriber).onNext("20") {
// filter
// element matches
// send it downstream Subscriber
Subscriber.onNext("20") {...}
}
}
}

As we can see from the preceding code, during runtime, the element from the source goes through the chain of Subscribers, executing different functionality during each of the stages.

The importance of understanding this phase is that during runtime we may apply optimization that may reduce amount of signals exchange. For example, as we are going to see in the next sections, we may reduce the number of Subscription#request calls and improve, therefore, performance of the stream.

As we may remember from Chapter 3Reactive Streams - the New Streams' Standard, the invocation of Subscription#request method causes a write to the volatile field that holds demand. Such a write is an expensive operation from computation perspective, so it is better to avoid it if possible.

To summarize our understanding of Stream's life cycle and the execution at each of the phases we may consider the following diagram:

Diagram 4.8. Reactive flow life-cycle

To summarize this, we have covered the central point in the execution life cycle of the Flux and Mono reactive types. In the following sections, we are going to use stream life cycle phases in order to clarify how Reactor provides very efficient implementations for each Reactive Stream.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.235.23