Of course, Project Reactor contains all kinds of operators for filtering elements, such as:
- The filter operator passes only elements that satisfy the condition.
- The ignoreElements operator returns Mono<T> and filters out all elements. The resulting sequence ends only after the original ends.
- The library allows for the limiting of taken elements with the take(n) method, which ignores all elements except the first n.
- takeLast returns only the last element of the stream.
- takeUntil(Predicate) passes an element until some condition is satisfied.
- elementAt(n) allows the taking of only the nth element of the sequence.
- The single operator emits a single item from the source and signals the NoSuchElementException error for an empty source or IndexOutOfBoundsException for a source with more than one element.
- It is possible to take or skip an element not only by an amount but also by Duration with the skip(Duration) or take(Duration)operators.
- Also, we may skip or take an element until some message arrives from another stream—takeUntilOther(Publisher) or skipUntilOther(Publisher).
Let's consider a workflow where we have to start and then stop stream processing as a reaction to some events originating from other streams. The code may look like the following:
Mono<?> startCommand = ... Mono<?> stopCommand = ... Flux<UserEvent> streamOfData = ... streamOfData .skipUntilOther(startCommand) .takeUntilOther(stopCommand) .subscribe(System.out::println);
In this case, we may start and then stop elements processing, but only once. The marble diagram for this use case would be as follows:
Diagram 4.5 Peeking elements between start-stop commands