Batching stream elements

Project Reactor supports the batching of stream elements (Flux<T>) in a couple of ways:

  • Buffering elements into containers such as List, the result stream has the Flux<List<T>>type.
  • Windowing elements into a stream of streams such as Flux<Flux<T>>. Note that, now, the stream signals not values but sub-streams, which we can process.
  • Grouping elements by some key into a stream that has the type Flux<GroupedFlux<K, T>>. Each new key triggers a new GroupedFlux instance and all elements with that key are pushed through that instance of the GroupFlux class.

Buffering and windowing may happen based on the following:

  • The number of processed elements; let's say every 10 elements
  • Some time-span; let's say every 5 minutes
  • Based on some predicate; let's say cutting before each new even number
  • Based on an event arrival from some other Flux, which controls the execution

Let's buffer integer elements in lists of size 4:

Flux.range(1, 13)
.buffer(4)
.subscribe(e -> log.info("onNext: {}", e));

The preceding code generates the following output:

onNext: [1, 2, 3, 4]
onNext: [5, 6, 7, 8]
onNext: [9, 10, 11, 12]
onNext: [13]

In the program's output, we can see that all but the last element are lists of size 4. The last element is a collection of size 1 because it is the modulus of 13 divided by 4. The buffer operator gathers many events into a collection of events. That collection itself becomes an event for a downstream operator. The buffer operator is handy for batch processing when it is desirable to make a handful of requests with collections of elements instead of many small requests with only one element. For example, instead of inserting elements into a database one by one, we may buffer items for a couple of seconds and do a batch insert. Of course, this is only if the consistency requirements allow us to do so.

To exercise the window operator, let's split the sequence of numbers into windows each time an element is a prime number. For that, we can use the windowUntil variant of the window operator. It uses a predicate to determine when to make a new slice. The code may look like the following:

Flux<Flux<Integer>> windowedFlux = Flux.range(101, 20)              // (1)
.windowUntil(this::isPrime, true); // (2)

windowedFlux.subscribe(window -> window // (3)
.collectList() // (4)
.subscribe(e -> log.info("window: {}", e))); // (5)

Let's look at the preceding code:

  1. At first, we generate 20 integers starting with 101.
  2. Here, we slice a new window with elements each time a number is a prime number. The second argument of the windowUntil operator defines whether we cut a new slice before or after satisfying the predicate. In the preceding code, we slice the way that a new prime number begins its window. The resulting stream has the Flux<Flux<Integer>> type.
  1. Now, we may subscribe to the windowedFlux stream. However, each element of the windowedFlux stream is itself a Reactive Stream. So, for each window, we make another reactive transformation.
  2. In our case, for each window, we collect elements with the collectList operator so that each window is now reduced to the Mono<List<Integer>> type.
  3. For each internal Mono element, we make a separate subscription and log received events.

The preceding code generates the following output:

window: []
window: [101, 102]
window: [103, 104, 105, 106]
window: [107, 108]
window: [109, 110, 111, 112]
window: [113, 114, 115, 116, 117, 118, 119, 120]

Note that the first window is empty. This happens because, as soon as we start the original stream, we generate an initial window. Then, the first element arrives (number 101), which is a prime number, which triggers a new window and, consequently, the already-opened window is closed (with the onComplete signal) without any elements.

Of course, we could resolve the exercise with the buffer operator. Both operators behave pretty similarly. However, buffer emits a collection only when a buffer closes, while the window operator propagates events as soon as they arrive, making it possible to react sooner and implement more sophisticated workflows.

Also, we may group elements in a Reactive Stream by some criteria with the groupBy operator. Let's divide the integer sequence by odd and even numbers and track only the last two elements in each group. The code may look like the following:

Flux.range(1, 7)                                                   // (1)
.groupBy(e -> e % 2 == 0 ? "Even" : "Odd") // (2)
.subscribe(groupFlux -> groupFlux // (3)
.scan( // (4)
new LinkedList<>(), // (4.1)
(list, elem) -> {
list.add(elem); // (4.2)
if (list.size() > 2) {
list.remove(0); // (4.3)
}
return list;
})
.filter(arr -> !arr.isEmpty()) // (5)
.subscribe(data -> // (6)
log.info("{}: {}", groupFlux.key(), data)));

Let's look at the preceding code:

  1. Here, we generate a small sequence of numbers.
  2. With the groupBy operator, we split the sequence between odd and even numbers based on the division module. The operator returns a stream of type Flux<GroupedFlux<String, Integer>>.
  3. Here, we subscribe to the main Flux and for each of the grouped fluxes, we apply the scan operator.
  4. The scan operator is a seed with the empty list (4.1). Each element in the grouped flux is added to the list (4.2), and if the list is larger than two elements, the oldest element is removed (4.3).
  5. The scan operator, first of all, propagates the seed and then recalculated values. In that case, the filter operator allows us to remove empty data containers from the scan's seed.
  6. Finally we subscribe separately for each grouped flux and display what the scan operator sends.

As we expect, the preceding code displays the following output:

Odd: [1]
Even: [2]
Odd: [1, 3]
Even: [2, 4]
Odd: [3, 5]
Even: [4, 6]
Odd: [5, 7]

Also, the Project Reactor library supports some advanced techniques such as grouping emitted elements over distinct time windows. For that functionality, please refer to the documentation of the groupJoin operator.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.42.128