Filtering reactive sequences

Of course, Project Reactor contains all kinds of operators for filtering elements, such as:

  • The filter operator passes only elements that satisfy the condition.
  • The ignoreElements operator returns Mono<T> and filters out all elements. The resulting sequence ends only after the original ends.
  • The library allows for the limiting of taken elements with the take(n) method, which ignores all elements except the first n.
  • takeLast returns only the last element of the stream.
  • takeUntil(Predicate) passes an element until some condition is satisfied.
  • elementAt(n) allows the taking of only the nth element of the sequence.
  • The single operator emits a single item from the source and signals the NoSuchElementException error for an empty source or IndexOutOfBoundsException for a source with more than one element.
  • It is possible to take or skip an element not only by an amount but also by Duration with the skip(Duration) or take(Duration)operators.
  • Also, we may skip or take an element until some message arrives from another stream—takeUntilOther(Publisher) or skipUntilOther(Publisher).

Let's consider a workflow where we have to start and then stop stream processing as a reaction to some events originating from other streams. The code may look like the following:

Mono<?> startCommand = ...
Mono<?> stopCommand = ...
Flux<UserEvent> streamOfData = ...

streamOfData
    .skipUntilOther(startCommand)
    .takeUntilOther(stopCommand)
    .subscribe(System.out::println);

In this case, we may start and then stop elements processing, but only once. The marble diagram for this use case would be as follows:

Diagram 4.5 Peeking elements between start-stop commands

 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.85.181