Caching elements of a stream

With ConnectableFlux, it is easy to implement different data caching strategies. However, Reactor already has the API for event caching in the form of the cache operator. Internally, the cache operator uses ConnectableFlux, so the primary added value of it is a fluent and straightforward API. We may tune the amount of data our cache can hold and the expiration time for each cached item. Let's demonstrate how it works with the following example:

Flux<Integer> source = Flux.range(0, 2)                              // (1)
.doOnSubscribe(s ->
log.info("new subscription for the cold publisher"));

Flux<Integer> cachedSource = source.cache(Duration.ofSeconds(1)); // (2)

cachedSource.subscribe(e -> log.info("[S 1] onNext: {}", e)); // (3)
cachedSource.subscribe(e -> log.info("[S 2] onNext: {}", e)); // (4)

Thread.sleep(1200); // (5)

cachedSource.subscribe(e -> log.info("[S 3] onNext: {}", e)); // (6)

The preceding code does the following:

  1. At first, we create a cold publisher that generates a few items.
  2. We cache the cold publisher with the cache operator for a duration of 1 second.
  3. We connect the first subscriber.
  4. Just after the first subscriber, we connect the second subscriber.
  5. We wait for some time to allow the cached data to expire.
  6. Finally we connect the third subscriber.

Let's look at the program's output:

new subscription for the cold publisher
[S 1] onNext: 0
[S 1] onNext: 1
[S 2] onNext: 0
[S 2] onNext: 1
new subscription for the cold publisher
[S 3] onNext: 0
[S 3] onNext: 1

Based on the logs, we can conclude that the first two subscribers shared the same cached data of the first subscription. Then, after a delay, the third subscriber was not able to retrieve the cached data, so a new subscription was triggered for the cold publisher. In the end, the third subscriber also received the desired data, even though that data did not arrive from the cache.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.184.102