Sharing elements of a stream

With ConnectableFlux, we multicast events for a couple of subscribers. However, we are waiting for subscribers to appear and only then do we start processing. The share operator allows the transformation of a cold publisher into a hot one. The operator behaves in a way that propagates the events that the subscriber has not missed yet for each new subscriber. Let's consider the following use case:

Flux<Integer> source = Flux.range(0, 5)
.delayElements(Duration.ofMillis(100))
.doOnSubscribe(s ->
log.info("new subscription for the cold publisher"));

Flux<Integer> cachedSource = source.share();

cachedSource.subscribe(e -> log.info("[S 1] onNext: {}", e));
Thread.sleep(400);
cachedSource.subscribe(e -> log.info("[S 2] onNext: {}", e));

In the preceding code, we shared a cold stream that generates events every 100 milliseconds. Then, with some delays, a couple of subscribers subscribe to the shared publisher. Let's look at the application's output:

new subscription for the cold publisher
[S 1] onNext: 0
[S 1] onNext: 1
[S 1] onNext: 2
[S 1] onNext: 3
[S 2] onNext: 3
[S 1] onNext: 4
[S 2] onNext: 4

From the logs, it is clear that the first subscriber started receiving events starting from the first one, while the second subscriber missed events generated before its appearance (S 2 only received events 3 and 4).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.191.86