Collecting reactive sequences

It is possible to collect all elements in the list and process the resulting collection as a Mono stream with Flux.collectList() and Flux.collectSortedList(). The last one not only collects elements but also sorts them. Consider the following code:

Flux.just(1, 6, 2, 8, 3, 1, 5, 1)
    .collectSortedList(Comparator.reverseOrder())
    .subscribe(System.out::println);

This produces the following output with one collection containing sorted numbers:

[8, 6, 5, 3, 2, 1, 1, 1]
Note that collecting sequence elements in the collection may be resource hungry, especially when a sequence has many elements. Also, it is possible to consume all the available memory when trying to collect on an endless stream.

Project Reactor allows the collection of Flux elements not only to List, but also to the following:

  • Map ( Map<K, T>with the collectMap operator
  • Multi-map (Map<K, Collection<T>>) with the collectMultimap operator
  • Any data structure with a custom java.util.stream.Collector and the Flux.collect(Collector) operator

Both Flux and Mono have the repeat() and repeat(times) methods, which allow for the looping of incoming sequences. We have already used these in the previous section.

One more handy method, called defaultIfEmpty(T)allows the provision of default values for an empty Flux or Mono.

Flux.distinct() passes only the element that has not been encountered in a stream before. However, this method keeps track of all unique elements, so use it carefully, especially with high-cardinality data streams. The distinct method has overrides that allow the provision of custom algorithms for duplicate tracking. So, it is sometimes possible to optimize resource usage of the distinct operator manually.

High-cardinality refers to data with elements that are very uncommon or unique. For example, identification numbers or usernames are typically highly-cardinal. At the same time, enum values or values from a small fixed dictionary are not. 

The Flux.distinctUntilChanged() operator has no such limitation and can be used for endless streams to remove duplicates that appear in an uninterrupted row. The following marble-diagram shows its behavior:

Diagram 4.6. Operator—distinct until changed
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.14.144.216