A great example of Hot Observables is ConnectableObservable. It is one of the most helpful forms of Hot Observables as well. It can turn any Observable, even a Cold Observable, into a Hot Observable. It doesn't start emitting on the subscribe call; instead, it gets activated after you call the connect method. You have to make the subscribe calls before calling connect; any subscribe calls after calling connect will miss the emissions fired previously.
Let's consider the following code snippet:
fun main(args: Array<String>) { val connectableObservable = listOf
("String 1","String 2","String 3","String 4","String
5").toObservable() .publish()//1 connectableObservable.subscribe({ println
("Subscription 1: $it") })//2 connectableObservable.map(String::reversed)//3 .subscribe({ println("Subscription 2 $it")})//4 connectableObservable.connect()//5 connectableObservable.subscribe({ println
("Subscription 3: $it") })//6 //Will not receive emissions }
The main purpose of ConnectableObservable is for Observables with multiple subscriptions to connect all subscriptions of an Observable together so that they can react to a single push; contrary to Cold Observables that repeats operations for doing the push, and pushes separately for each subscription, thus repeating the cycle. ConnectableObservable connects all subscriptions (Observers) called before the connect method and relays a single push to all Observers, Observers then react to/process that push.
In the preceding example, we created Observable with the toObservable() method, then, on comment 1, we used the publish operator to convert Cold Observable into ConnectableObservable.
On comment 2, we subscribed to connectableObservable. On comment 3, we used the map operator to reverse String, and, on comment 4, we subscribed to the mapped connectableObservable.
On comment 5, we called connect method, and emissions got started to both Observers.
Here is the output:
This mechanism of emitting from Observable once and then relaying the emission to all Subscriptions/Observers is known as multicasting.
Also note that the subscribe call on comment 6, after connect, has not received any emissions, as ConnectableObservable is hot, and any new subscriptions occurred after connect will miss out the emissions fired previously (between the call of the connect method and the new subscription, remember that, within a few milliseconds, computers can do a lot of tasks); in this case, it missed all the emissions.
The following piece of code is another example to make you understand it better:
fun main(args: Array<String>) { val connectableObservable =
Observable.interval(100,TimeUnit.MILLISECONDS) .publish()//1 connectableObservable. subscribe({ println("Subscription 1: $it") })//2 connectableObservable .subscribe({ println("Subscription 2 $it")})//3 connectableObservable.connect()//4 runBlocking { delay(500) }//5 connectableObservable. subscribe({ println("Subscription 3: $it") })//6 runBlocking { delay(500) }//7 }
This example is almost the same as the previous one, just a few tweaks.
Here, we used the Observable.interval method to create Observable; the benefit is that, as it takes an interval before each emission, it will give some room to the subscription after connect to get a few emissions.
On comment 1, we converted Cold Observable to ConnectableObservable, as with the previous one, and did two subscriptions and then connected, as in the previous example (comment 2, 3, 4).
We called delay right after connect on comment 5, then subscribed again on comment 6, and again a delay on comment 7 to allow the 3rd subscription to print some data.
The following output will allow us to understand better:
Go through the output carefully to note that the 3rd subscription received emissions from sequence 5, and missed all previous ones (there were 5 emissions before the 3rd subscription—500 millisecond delay/100 millisecond interval).