Multicasting elements of a stream

Of course, we may transform a cold publisher into a hot one by applying a reactive transformation. For example, we may want to share the results of a cold processor between a few subscribers as soon as all of them are ready for data generation. Also, we don't want to regenerate data for each subscriber. Project Reactor has ConnectableFlux precisely for such purposes. With ConnectableFlux, data is generated to fulfill the most hungry demand and it is also cached so that all other subscribers can process the data at their pace. Of course, the size of the queue and timeouts are configurable via the publish and replay methods of the class. Also, ConnectableFlux can automatically track the number of downstream subscribers to trigger execution when a desired threshold is achieved by using the following methods—connect, autoConnect(n)refCount(n), and refCount(int, Duration).

Let's describe the behavior of ConnectableFlux with the following example:

Flux<Integer> source = Flux.range(0, 3)
.doOnSubscribe(s ->
log.info("new subscription for the cold publisher"));

ConnectableFlux<Integer> conn = source.publish();

conn.subscribe(e -> log.info("[Subscriber 1] onNext: {}", e));
conn.subscribe(e -> log.info("[Subscriber 2] onNext: {}", e));

log.info("all subscribers are ready, connecting");
conn.connect();

When running, the preceding code produces the following output:

all subscribers are ready, connecting
new subscription for the cold publisher
[Subscriber 1] onNext: 0
[Subscriber 2] onNext: 0
[Subscriber 1] onNext: 1
[Subscriber 2] onNext: 1
[Subscriber 1] onNext: 2
[Subscriber 2] onNext: 2

As we can see, our cold publisher received a subscription and consequently only generated items only once. However, both subscribers received a complete set of events.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.17.27