It is possible to collect all elements in the list and process the resulting collection as a Mono stream with Flux.collectList() and Flux.collectSortedList(). The last one not only collects elements but also sorts them. Consider the following code:
Flux.just(1, 6, 2, 8, 3, 1, 5, 1) .collectSortedList(Comparator.reverseOrder()) .subscribe(System.out::println);
This produces the following output with one collection containing sorted numbers:
[8, 6, 5, 3, 2, 1, 1, 1]
Project Reactor allows the collection of Flux elements not only to List, but also to the following:
- Map ( Map<K, T>) with the collectMap operator
- Multi-map (Map<K, Collection<T>>) with the collectMultimap operator
- Any data structure with a custom java.util.stream.Collector and the Flux.collect(Collector) operator
Both Flux and Mono have the repeat() and repeat(times) methods, which allow for the looping of incoming sequences. We have already used these in the previous section.
One more handy method, called defaultIfEmpty(T), allows the provision of default values for an empty Flux or Mono.
Flux.distinct() passes only the element that has not been encountered in a stream before. However, this method keeps track of all unique elements, so use it carefully, especially with high-cardinality data streams. The distinct method has overrides that allow the provision of custom algorithms for duplicate tracking. So, it is sometimes possible to optimize resource usage of the distinct operator manually.
The Flux.distinctUntilChanged() operator has no such limitation and can be used for endless streams to remove duplicates that appear in an uninterrupted row. The following marble-diagram shows its behavior: