Multicasting with operators

To see how multicasting works within a chain of operators, we are going to use Observable.range() and then map each emission to a random integer. Since these random values will be nondeterministic and different for each subscription, this will provide a good means to see whether our multicasting is working because Observers should receive the same numbers.

Let's start with emitting the numbers 1 through 3 and map each one to a random integer between 0 and 100,000. If we have two Observers, we can expect different integers for each one. Note that your output will be different than mine due to the random number generation and just acknowledge that both Observers are receiving different random integers:

import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
public static void main(String[] args) {

Observable<Integer> threeRandoms = Observable.range(1,3)
.map(i -> randomInt());

threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));

}

public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}

The output is as follows:

Observer 1: 38895
Observer 1: 36858
Observer 1: 82955
Observer 2: 55957
Observer 2: 47394
Observer 2: 16996

What happens here is that the Observable.range() source will yield two separate emission generators, and each will coldly emit a separate stream for each Observer. Each stream also has its own separate map() instance, hence each Observer gets different random integers. You can visually see this structure of two separate streams in the following figure:

Figure 5.1 - Two separate streams of operations are created for each Observer

Say, you want to emit the same three random integers to both Observers. Your first instinct might be to call publish() after Observable.range() to yield a ConnectableObservable. Then, you may call the map() operator on it, followed by the Observers and a connect() call. But you will see that this does not achieve our desired result. Each Observer still gets three separate random integers:

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
public static void main(String[] args) {

ConnectableObservable<Integer> threeInts = Observable.range(1,3).publish();

Observable<Integer> threeRandoms = threeInts.map(i -> randomInt());

threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));

threeInts.connect();
}

public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}

The output is as follows:

Observer 1: 99350
Observer 2: 96343
Observer 1: 4155
Observer 2: 75273
Observer 1: 14280
Observer 2: 97638

This occurred because we multicast after Observable.range(), but the multicasting happens before the map() operator. Even though we consolidated to one set of emissions coming from Observable.range(), each Observer is still going to get a separate stream at map(). Everything before publish() was consolidated into a single stream (or more technically, a single proxy Observer). But after publish(), it will fork into separate streams for each Observer again, as shown in the following figure:

Figure 5.2 - Mulitcasting after Observable.range() will consolidate the interval emissions into a single stream before publish(), but will still fork to two separate streams after publish() for each Observer.

If we want to prevent the map() operator from yielding two separate streams for each Observer, we need to call publish() after map() instead:

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
public static void main(String[] args) {

ConnectableObservable<Integer> threeRandoms = Observable.range(1,3)
.map(i -> randomInt()).publish();

threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));

threeRandoms.connect();
}

public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}

The output is as follows:

Observer 1: 90125
Observer 2: 90125
Observer 1: 79156
Observer 2: 79156
Observer 1: 76782
Observer 2: 76782

That is better! Each Observer got the same three random integers, and we have effectively multicast the entire operation right before the two Observers, as shown in the following figure. We now have a single stream instance throughout the entire chain since map() is now behind, not in front, of publish():

Figure 5.3 - A fully multicast operation that guarantees both Observers get the same emissions since all operators are behind the publish() call
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.27.211