To see how multicasting works within a chain of operators, we are going to use Observable.range() and then map each emission to a random integer. Since these random values will be nondeterministic and different for each subscription, this will provide a good means to see whether our multicasting is working because Observers should receive the same numbers.
Let's start with emitting the numbers 1 through 3 and map each one to a random integer between 0 and 100,000. If we have two Observers, we can expect different integers for each one. Note that your output will be different than mine due to the random number generation and just acknowledge that both Observers are receiving different random integers:
import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
Observable<Integer> threeRandoms = Observable.range(1,3)
.map(i -> randomInt());
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
}
public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}
The output is as follows:
Observer 1: 38895
Observer 1: 36858
Observer 1: 82955
Observer 2: 55957
Observer 2: 47394
Observer 2: 16996
What happens here is that the Observable.range() source will yield two separate emission generators, and each will coldly emit a separate stream for each Observer. Each stream also has its own separate map() instance, hence each Observer gets different random integers. You can visually see this structure of two separate streams in the following figure:
Say, you want to emit the same three random integers to both Observers. Your first instinct might be to call publish() after Observable.range() to yield a ConnectableObservable. Then, you may call the map() operator on it, followed by the Observers and a connect() call. But you will see that this does not achieve our desired result. Each Observer still gets three separate random integers:
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
ConnectableObservable<Integer> threeInts = Observable.range(1,3).publish();
Observable<Integer> threeRandoms = threeInts.map(i -> randomInt());
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
threeInts.connect();
}
public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}
The output is as follows:
Observer 1: 99350
Observer 2: 96343
Observer 1: 4155
Observer 2: 75273
Observer 1: 14280
Observer 2: 97638
This occurred because we multicast after Observable.range(), but the multicasting happens before the map() operator. Even though we consolidated to one set of emissions coming from Observable.range(), each Observer is still going to get a separate stream at map(). Everything before publish() was consolidated into a single stream (or more technically, a single proxy Observer). But after publish(), it will fork into separate streams for each Observer again, as shown in the following figure:
If we want to prevent the map() operator from yielding two separate streams for each Observer, we need to call publish() after map() instead:
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
ConnectableObservable<Integer> threeRandoms = Observable.range(1,3)
.map(i -> randomInt()).publish();
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
threeRandoms.connect();
}
public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}
The output is as follows:
Observer 1: 90125
Observer 2: 90125
Observer 1: 79156
Observer 2: 79156
Observer 1: 76782
Observer 2: 76782
That is better! Each Observer got the same three random integers, and we have effectively multicast the entire operation right before the two Observers, as shown in the following figure. We now have a single stream instance throughout the entire chain since map() is now behind, not in front, of publish():