Reducing stream elements

Project Reactor makes it possible to count() the number of elements in the stream, or check that all elements have required properties with Flux.all(Predicate). It is also easy to check whether at least one element has a desired property with the Flux.any(Predicate) operator.

We can check whether a stream has any elements with the hasElements operator or whether the stream contains the desired element with the hasElement operator. The latter implements the short-circuit logic and completes with true as soon as an element matches the value. Also, the any operator allows the checking of not only an elements' equality but also any other property by providing a custom Predicate instance. Let's check that a sequence has an even number in it:

Flux.just(3, 5, 7, 9, 11, 15, 16, 17)
.any(e -> e % 2 == 0)
.subscribe(hasEvens -> log.info("Has evens: {}", hasEvens));

The sort operator allows the sorting of elements in the background and then emits the sorted sequence once the original sequence completes.

The Flux class allows the reduction of sequences with a custom logic (sometimes, the procedure is called folding). The reduce operator usually requires an initial value and a function that combines the result of the previous step with the element from the current step. Let's sum integer numbers between 1 and 10:

Flux.range(1, 5)
.reduce(0, (acc, elem) -> acc + elem)
.subscribe(result -> log.info("Result: {}", result));

The result would be 15. The reduce operator produces only one element with the final result. However, when doing aggregations, it's sometimes handy to send downstream intermediate results. The Flux.scan() operator does that. Let's sum integer numbers between 1 and 10 with the scan operator:

Flux.range(1, 5)
.scan(0, (acc, elem) -> acc + elem)
.subscribe(result -> log.info("Result: {}", result));

The preceding code produces the following output:

Result: 0
Result: 1
Result: 3
Result: 6
Result: 10
Result: 15

As we can see, the final result is the same (15). However, we also received all intermediate results. With that said, the scan operator may be useful for many applications that need some information about ongoing events. For example, we can calculate the moving average on the stream:

int bucketSize = 5;                                                // (1)
Flux.range(1, 500) // (2)
.index() // (3)
.scan( // (4)
new int[bucketSize], // (4.1)
(acc, elem) -> { //
acc[(int)(elem.getT1() % bucketSize)] = elem.getT2(); // (4.2)
return acc; // (4.3)
})
.skip(bucketSize) // (5)
.map(array -> Arrays.stream(array).sum() * 1.0 / bucketSize) // (6)
.subscribe(av -> log.info("Running average: {}", av)); // (7)

Let's describe this code:

  1. Here, we define the size of the moving average window (let's say we are interested in the most recent five events).
  2. Let's generate some data with the range operator.
  3. With the index operator, we may attach an index to each of the elements.
  4. With the scan operator, we collect the latest five elements into a container (4.1), where the element's index is used to calculate the position in the container (4.2). On every step, we return the same container with the updated content.
  5. Here, we skip some elements at the beginning of the stream to gather enough data for the moving average.
  6. To calculate the value of the moving average,  we divide the sum of the container content on its size.
  7. Of course, we have to subscribe for data to receive values.

The Mono and Flux streams have the then, thenMany, and thenEmpty operators, which complete when the upper stream completes. The operators ignore incoming elements and only replay completion or error signals. These operators can be useful for triggering new streams as soon as the upper stream finishes processing: 

Flux.just(1, 2, 3)
.thenMany(Flux.just(4, 5))
.subscribe(e -> log.info("onNext: {}", e));

The lambda in the subscribe method receives only 4 and 5, even though 1, 2, and 3 are generated and processed by the stream.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.79.206