Peeking elements while sequence processing

Sometimes it is required to perform an action for each element or a particular signal in the middle of the processing pipeline. To fulfill such requirements, Project Reactor provides the following methods:

  • doOnNext(Consumer<T>) allows us to execute some action on each element on Flux or Mono
  • doOnComplete()and doOnError(Throwable) are invoked on corresponding events
  • doOnSubscribe(Consumer<Subscription>), doOnRequest(LongConsumer), and doOnCancel(Runnable) allow us to react to subscription life-cycle events
  • doOnTerminate(Runnable) is called when a stream is terminated, no matter what caused the termination

Also, Flux and Mono provide the doOnEach(Consumer<Signal>) method, which handles all signals that represent a Reactive Stream domain—onError, onSubscribe, onNext, onError, and onComplete.

Let's consider the following code:

Flux.just(1, 2, 3)
.concatWith(Flux.error(new RuntimeException("Conn error")))
.doOnEach(s -> log.info("signal: {}", s))
.subscribe();

The preceding code uses the concatWith operator, which is a convenient wrapper over the concat operator. Also, the preceding code generates the following output:

signal: doOnEach_onNext(1)
signal: doOnEach_onNext(2)
signal: doOnEach_onNext(3)
signal: onError(java.lang.RuntimeException: Conn error)

In this example, we not only received all onNext signals but also the onError signal.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.151.44