Subscribing to Reactive Streams

As we may guess, Flux and Mono provide lambda-based overloads of the subscribe() method, which simplifies the subscription routine a lot:

subscribe();                                                         // (1)
subscribe(Consumer<T> dataConsumer); // (2)
subscribe(Consumer<T> dataConsumer, // (3) Consumer<Throwable> errorConsumer);
subscribe(Consumer<T> dataConsumer, // (4) Consumer<Throwable> errorConsumer, Runnable completeConsumer);
subscribe(Consumer<T> dataConsumer, // (5) Consumer<Throwable> errorConsumer, Runnable completeConsumer, Consumer<Subscription> subscriptionConsumer);

subscribe(Subscriber<T> subscriber); // (6)

Let's explore the options we have for creating subscribers. First of all, all overrides of the subscribe method return an instance of the Disposable interface. This can be used to cancel the underlying Subscription. In cases (1) to (4), the subscription requests the unbounded demand (Long.MAX_VALUE). Now, let's look at the differences:

  1. This is the simplest way of subscribing to a stream, as this method ignores all signals. Usually, other variants should be preferred. However, sometimes it may be useful to trigger stream processing that has side effects.
  2. The dataConsumer is invoked on each value (onNext signal). It does not handle onError and onComplete signals.
  3. The same as in option (2); however, this allows handling of the onError signal. The onComplete signal is ignored.
  4. The same as in option (3); however, this also allows handling of the onComplete signal.
  5. Allows all elements in the Reactive Stream to be consumed, including handle errors and a completion signal. Importantly, this override allows the subscription to be controlled by requesting an adequate amount of data of course, we may still request Long.MAX_VALUE).
  6. The most generic way of subscribing to the sequence. Here, we may provide our Subscriber implementation with the desired behavior. Even though this option is very versatile, it is rarely required. 

Let's create a simple Reactive Stream and subscribe to it:

Flux.just("A", "B", "C")
   .subscribe(
      data -> log.info("onNext: {}", data),
      err -> { /* ignored  */ },
      () -> log.info("onComplete"));

The preceding code produces the following console output:

onNext: A
onNext: B
onNext: C
onComplete

It is worth noting once again that simple subscription request unbound demand options (Long.MAX_VALUEmay sometimes force the producer to do a significant amount of work to fulfill the demand. So, if the producer is more suited to handling a bounded demand, it is recommended to control the demand with the subscription object or by applying request limiting operators, which are covered later in the chapter.

Let's subscribe to a Reactive Stream with a manual subscription control:

Flux.range(1, 100)                                                 // (1)
.subscribe( // (2)
data -> log.info("onNext: {}", data),
err -> { /* ignore */ },
() -> log.info("onComplete"),
subscription -> { // (3)
subscription.request(4); // (3.1)
subscription.cancel(); // (3.2)
}
);

The preceding code does the following:

  1. At first, we generate 100 values with the range operator.
  2. We subscribe to the stream in the same way as in the previous example.
  3. However, now we control the subscription. At first, we request 4 items (3.1) and then immediately cancel the subscription (3.2), so other elements should not be generated at all.

When running the preceding code, we receive the following output:

onNext: 1
onNext: 2
onNext: 3
onNext: 4

Note that, we do not receive an onComplete signal because the subscriber canceled the subscription before the stream finished. It is also important to remember that a Reactive Stream may be finished by a producer (with the onError or onComplete signals) or canceled by a subscriber via a Subscription instance.  Also, the Disposable instance may also be used for the purposes of cancellation. Usually, it is used not by a subscriber, but by the code one level of abstraction above. For example, let's cancel stream processing by calling Disposable:

Disposable disposable = Flux.interval(Duration.ofMillis(50))         // (1)
.subscribe( // (2)
data -> log.info("onNext: {}", data)
);
Thread.sleep(200); // (3)
disposable.dispose(); // (4)

The preceding code does the following:

  1. The interval factory method allows the generation of events with a defined period (every 50 milliseconds). The generated stream is endless.
  2. We subscribe by providing only the handler for onNext signals.
  3. We wait for some time to receive a couple of events (200/50 should allow passing about 4 events).
  4. Calls the dispose method which internally cancels the subscription.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.189.171.153