Looking at the previous examples implemented using the Observable.create()
, Observable.just()
, and Observable.from()
methods, we can say that until someone subscribes to them, they are inactive and don't emit anything. However, each time someone subscribes, they start emitting their notifications. For example, if we subscribe three times to an Observable.from(Iterable)
object, the Iterable
instance will be iterated three times. The Observable
instances behaving like that are called cold Observable instances.
All of the factory methods we've been using in this chapter return cold Observables. Cold Observables produce notifications on demand, and for every Subscriber, they produce independent notifications.
There are Observable
instances which, when they start emitting notifications, it doesn't matter if there are subscriptions to them or not. They continue emitting them until completion. All the subscribers receive the same notifications, and by default, when a Subscriber subscribes, it doesn't receive the notifications emitted before that. These are hot Observable instances.
We can say that cold Observables generate notifications for each subscriber and hot Observables are always running, broadcasting notifications to all of their subscribers. Think of a hot Observable as a radio station. All of the listeners that are listening to it at this moment listen to the same song. A cold Observable is a music CD. Many people can buy it and listen to it independently.
As we mentioned, there are a lot of examples in this book that use cold Observables. What about hot Observable instances? If you remember when we implemented 'The Reactive Sum' in the first chapter, we had an Observable
instance that was emitting every line the user had typed in the standard input stream. This one was hot, and we forked two Observable
instances from it, one for the collector a
and one for b
. They received the same input lines and filtered only the ones they were interested in. This input Observable
instance was implemented using a special type of Observable
, called ConnectableObservable
.
These Observable
instances are inactive until their connect()
method is called. After that, they become hot Observables. The ConnectableObservable
instance can be created from any Observable
instance by calling its publish()
method. In other words, the publish()
method can turn any cold Observable into a hot one. Let's look at this example:
Observable<Long> interval = Observable.interval(100L, TimeUnit.MILLISECONDS); ConnectableObservable<Long> published = interval.publish(); Subscription sub1 = subscribePrint(published, "First"); Subscription sub2 = subscribePrint(published, "Second"); published.connect(); Subscription sub3 = null; try { Thread.sleep(500L); sub3 = subscribePrint(published, "Third"); Thread.sleep(500L); } catch (InterruptedException e) {} sub1.unsubscribe(); sub2.unsubscribe(); sub3.unsubscribe();
Nothing will happen until the connect()
method is called. After that, we'll see the same sequential numbers outputted twice—once for each Subscriber. The third Subscriber will join the other two, printing the numbers emitted after the first 500 milliseconds, but it won't print the numbers emitted before its subscription.
What if we want to receive all the notifications that have been emitted before our subscription and then to continue receiving the incoming ones? That can be accomplished by calling the replay()
method instead of the publish()
method. It creates a ConnectableObservable
instance from the source Observable
instance with this little twist: all the subscribers, whenever they subscribe, will receive all the notifications (the previous notifications will arrive in order and synchronously).
There is a way to activate an Observable
instance to become hot without calling the connect()
method. It can be activated on the first subscription to it and deactivated when every Subscriber
instance unsubscribes. Such an Observable
instance can be created from a ConnectableObservable
instance by calling the refCount()
method on it (the name of the method comes from 'reference count'; it counts the Subscriber
instances subscribed to the Observable
instance created by it). Here is the preceding example implemented using the refCount()
method:
Observable<Long> refCount = interval.publish().refCount(); Subscription sub1 = subscribePrint(refCount, "First"); Subscription sub2 = subscribePrint(refCount, "Second"); try { Thread.sleep(300L); } catch (InterruptedException e) {} sub1.unsubscribe(); sub2.unsubscribe(); Subscription sub3 = subscribePrint(refCount, "Third"); try { Thread.sleep(300L); } catch (InterruptedException e) { } sub3.unsubscribe();
The Observable
instance will be deactivated when sub2
unsubscribes. If someone subscribes to it after that, it will begin emitting the sequence from the beginning. This is what's happening with sub3
. There is a share()
method, which is an alias for the publish().refCount()
call.
The source code of the preceding example can be viewed/downloaded at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/UsingConnectableObservables.java.
There is one other way to create a hot Observable: using a Subject
instance. We will introduce them in the next and last section of this chapter.
18.227.111.208