The subscribeOn operator

Another important factor for multithreading in Reactor is through an operator called subscribeOn. In contrast to publishOn, subscribeOn allows you to change the worker on which part of the subscription chain is happening. This operator can be useful when we create a source of the stream from the execution of a function. Usually, such executions take place at subscription time, so a function that provides us with the source of data that executes the .subscribe method is called. For example, let's take a look at the following code sample, which shows how we can supply some information using Mono.fromCallable:

ObjectMapper objectMapper = ...
String json = "{ "color" : "Black", "type" : "BMW" }";
Mono.fromCallable(() ->
       objectMapper.readValue(json, Car.class)
    )
    ...

Here, Mono.fromCallable allows the creation of a Mono from Callable<T> and provides the result of its evaluation to each Subscriber. The Callable instance executes when we call the .subscribe method, so Mono.fromCallable does the following under the hood:

public void subscribe(Subscriber actual) {
    ...
    Subscription subscription = ...
    try {
        T t = callable.call();
        if (t == null) {
            subscription.onComplete();
        }
        else {
            subscription.onNext(t);
            subscription.onComplete();
        }
    }
    catch (Throwable e) {
        actual.onError(
Operators.onOperatorError(e, actual.currentContext())); } }

As we can see from the preceding code, execution of the callable happens in the subscribe method. This means that we can use publishOn to change the worker on which the execution of Callable will take place. Fortunately, subscribeOn allows us to specify the worker on which the subscription will take place. The following example shows how we can do that:

Scheduler scheduler = ...;
Mono.fromCallable(...)
    .subscribeOn(scheduler)
    .subscribe();

The preceding example shows how we can execute the given Mono.fromCallable on a separate worker. Under the hood, subscribeOn executes the subscription to the parent Publisher into Runnable, which is the scheduler for a specified Scheduler. If we compared the execution model of subscribeOn and publishOnwe would see the following:

Diagram 4.12 The internals of the .publishOn operator 

As we can see from the preceding diagram, subscribeOn can partially specify the runtime worker along with the subscription-time worker. This happens because, along with the scheduling of the subscribe method's execution, it schedules each call to the Subscription.request() method, so it happens on the worker specified by the Scheduler instance.  According to the Reactive Streams specification, a Publisher may start sending data on the caller Thread, so the subsequent Subscriber.onNext() will be called on the same Thread as the initial Subscription.request() call. In contrast, publishOn can specify the execution behavior only for downstream and cannot affect the upstream execution.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.22.41.212