Introducing the ConnectableObservable object

A great example of Hot Observables is ConnectableObservable. It is one of the most helpful forms of Hot Observables as well. It can turn any Observable, even a Cold Observable, into a Hot Observable. It doesn't start emitting on the subscribe call; instead, it gets activated after you call the connect method. You have to make the subscribe calls before calling connect; any subscribe calls after calling connect will miss the emissions fired previously.

Let's consider the following code snippet:

    fun main(args: Array<String>) { 
      val connectableObservable = listOf
("String 1","String 2","String 3","String 4","String
5").toObservable() .publish()//1 connectableObservable.subscribe({ println
("Subscription 1: $it") })//2 connectableObservable.map(String::reversed)//3 .subscribe({ println("Subscription 2 $it")})//4 connectableObservable.connect()//5 connectableObservable.subscribe({ println
("Subscription 3: $it") })//6 //Will not receive emissions }

The main purpose of ConnectableObservable is for Observables with multiple subscriptions to connect all subscriptions of an Observable together so that they can react to a single push; contrary to Cold Observables that repeats operations for doing the push, and pushes separately for each subscription, thus repeating the cycle. ConnectableObservable connects all subscriptions (Observers) called before the connect method and relays a single push to all Observers, Observers then react to/process that push.

In the preceding example, we created Observable with the toObservable() method, then, on comment 1, we used the publish operator to convert Cold Observable into ConnectableObservable.

On comment 2, we subscribed to connectableObservable. On comment 3, we used the map operator to reverse String, and, on comment 4, we subscribed to the mapped connectableObservable.

On comment 5, we called connect method, and emissions got started to both Observers.

Note that we used the map operator in this example on comment 3. We will discuss the map operator in detail in Chapter 5Asynchronous Data Operators and Transformations. However, here is the definition, if you are curious. The map operator applies a function of your choosing to each item emitted by the source Observable, and returns an Observable that emits the results of these function applications.

Here is the output:

Note that, as the output suggests, each emission goes to each Observer simultaneously, and they are processing data in an interleaved fashion.

This mechanism of emitting from Observable once and then relaying the emission to all Subscriptions/Observers is known as multicasting.

Also note that the subscribe call on comment 6, after connect, has not received any emissions, as ConnectableObservable is hot, and any new subscriptions occurred after connect will miss out the emissions fired previously (between the call of the connect method and the new subscription, remember that, within a few milliseconds, computers can do a lot of tasks); in this case, it missed all the emissions.

The following piece of code is another example to make you understand it better:

    fun main(args: Array<String>) { 
      val connectableObservable =  
Observable.interval(100,TimeUnit.MILLISECONDS) .publish()//1 connectableObservable. subscribe({ println("Subscription 1: $it") })//2 connectableObservable .subscribe({ println("Subscription 2 $it")})//3 connectableObservable.connect()//4 runBlocking { delay(500) }//5 connectableObservable. subscribe({ println("Subscription 3: $it") })//6 runBlocking { delay(500) }//7 }

This example is almost the same as the previous one, just a few tweaks.

Here, we used the Observable.interval method to create Observable; the benefit is that, as it takes an interval before each emission, it will give some room to the subscription after connect to get a few emissions.

On comment 1, we converted Cold Observable to ConnectableObservable, as with the previous one, and did two subscriptions and then connected, as in the previous example (comment 2, 3, 4).

We called delay right after connect on comment 5, then subscribed again on comment 6, and again a delay on comment 7 to allow the 3rd subscription to print some data.

The following output will allow us to understand better:

Go through the output carefully to note that the 3rd subscription received emissions from sequence 5, and missed all previous ones (there were 5 emissions before the 3rd subscription—500 millisecond delay/100 millisecond interval).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.163.13