Mapping elements of reactive sequences 

The most natural way of transforming a sequence is by mapping every element to some new value. Flux and Mono give the map operator, which behaves similarly to the map operator from the Java Stream API. The function with a map(Function<T, R>) signature allows the processing of elements one by one. Of course, as it changes the type of element from T to R, the whole sequence changes its type, so after the map operator Flux<T> becomes Flux<R>, Mono<T> becomes Mono<R>. The marble diagram for Flux.map() is as follows:

Diagram 4.4 Operator: map

Of course, the map operator of the Mono class behaves similarly. The cast(Class c) operator casts elements of a stream to the target class. The easiest way of implementing the cast(Class c) operator would be through the usage of the map() operator. We can look into the source of the Flux class and find the following code, which proves our assumption:

public final <E> Flux<E> cast(Class<E> clazz) {
   return map(clazz::cast);
}

The index operator allows the enumeration of elements in the sequence. The method has the following signature—Flux<Tuple2<Long, T>> index(). So, now we have to work with the Tuple2 class. This represents the Tuple data structure, which is not present in the standard Java library. The library provides Tuple2 to Tuple8 classes, which are often used by the library operators. The timestamp operator behaves similarly to the index operator, but adds the current timestamp instead of an index. So, the following code should both enumerate elements and attach timestamps to every element in the sequence:

Flux.range(2018, 5)                                                // (1)
.timestamp() // (2)
.index() // (3)
.subscribe(e -> log.info("index: {}, ts: {}, value: {}", // (4)
e.getT1(), // (4.1)
Instant.ofEpochMilli(e.getT2().getT1()), // (4.2)
e.getT2().getT2())); // (4.3)

The preceding code does the following:

  1. Here, we generate some data with the range operator (2018 to 2022). This operator returns the sequence of type Flux<Integer>.
  2. With the timestamp operator, we attach the current timestamp. Now, the sequence has the Flux<Tuple2<Long, Integer>> type.
  3. Here, we apply enumeration with the index operator. Now, the sequence has the Flux<Tuple2<Long, Tuple2<Long, Integer>>> type.
  4. Here, we subscribe to the sequence and log elements. The e.getT1() call returns an index (4.1), and the e.getT2().getT1() call returns a timestamp, which we output in a human-readable way with the Instant class (4.2), while the e.getT2().getT2() call returns an actual value (4.3).

After running the previous code snippet, we should receive the following output:

index: 0, ts: 2018-09-24T03:00:52.041Z, value: 2018
index: 1, ts: 2018-09-24T03:00:52.061Z, value: 2019
index: 2, ts: 2018-09-24T03:00:52.061Z, value: 2020
index: 3, ts: 2018-09-24T03:00:52.061Z, value: 2021
index: 4, ts: 2018-09-24T03:00:52.062Z, value: 2022
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.221.133