Sometimes it is required to perform an action for each element or a particular signal in the middle of the processing pipeline. To fulfill such requirements, Project Reactor provides the following methods:
- doOnNext(Consumer<T>) allows us to execute some action on each element on Flux or Mono
- doOnComplete()and doOnError(Throwable) are invoked on corresponding events
- doOnSubscribe(Consumer<Subscription>), doOnRequest(LongConsumer), and doOnCancel(Runnable) allow us to react to subscription life-cycle events
- doOnTerminate(Runnable) is called when a stream is terminated, no matter what caused the termination
Also, Flux and Mono provide the doOnEach(Consumer<Signal>) method, which handles all signals that represent a Reactive Stream domain—onError, onSubscribe, onNext, onError, and onComplete.
Let's consider the following code:
Flux.just(1, 2, 3)
.concatWith(Flux.error(new RuntimeException("Conn error")))
.doOnEach(s -> log.info("signal: {}", s))
.subscribe();
The preceding code uses the concatWith operator, which is a convenient wrapper over the concat operator. Also, the preceding code generates the following output:
signal: doOnEach_onNext(1)
signal: doOnEach_onNext(2)
signal: doOnEach_onNext(3)
signal: onError(java.lang.RuntimeException: Conn error)
In this example, we not only received all onNext signals but also the onError signal.