Hot and cold Observable instances

Looking at the previous examples implemented using the Observable.create(), Observable.just(), and Observable.from() methods, we can say that until someone subscribes to them, they are inactive and don't emit anything. However, each time someone subscribes, they start emitting their notifications. For example, if we subscribe three times to an Observable.from(Iterable) object, the Iterable instance will be iterated three times. The Observable instances behaving like that are called cold Observable instances.

All of the factory methods we've been using in this chapter return cold Observables. Cold Observables produce notifications on demand, and for every Subscriber, they produce independent notifications.

There are Observable instances which, when they start emitting notifications, it doesn't matter if there are subscriptions to them or not. They continue emitting them until completion. All the subscribers receive the same notifications, and by default, when a Subscriber subscribes, it doesn't receive the notifications emitted before that. These are hot Observable instances.

We can say that cold Observables generate notifications for each subscriber and hot Observables are always running, broadcasting notifications to all of their subscribers. Think of a hot Observable as a radio station. All of the listeners that are listening to it at this moment listen to the same song. A cold Observable is a music CD. Many people can buy it and listen to it independently.

As we mentioned, there are a lot of examples in this book that use cold Observables. What about hot Observable instances? If you remember when we implemented 'The Reactive Sum' in the first chapter, we had an Observable instance that was emitting every line the user had typed in the standard input stream. This one was hot, and we forked two Observable instances from it, one for the collector a and one for b. They received the same input lines and filtered only the ones they were interested in. This input Observable instance was implemented using a special type of Observable, called ConnectableObservable.

The ConnectableObservable class

These Observable instances are inactive until their connect() method is called. After that, they become hot Observables. The ConnectableObservable instance can be created from any Observable instance by calling its publish() method. In other words, the publish() method can turn any cold Observable into a hot one. Let's look at this example:

Observable<Long> interval = Observable.interval(100L, TimeUnit.MILLISECONDS);
ConnectableObservable<Long> published = interval.publish();
Subscription sub1 = subscribePrint(published, "First");
Subscription sub2 = subscribePrint(published, "Second");
published.connect();
Subscription sub3 = null;
try {
  Thread.sleep(500L);
  sub3 = subscribePrint(published, "Third");
  Thread.sleep(500L);
}
catch (InterruptedException e) {}
sub1.unsubscribe();
sub2.unsubscribe();
sub3.unsubscribe();

Nothing will happen until the connect() method is called. After that, we'll see the same sequential numbers outputted twice—once for each Subscriber. The third Subscriber will join the other two, printing the numbers emitted after the first 500 milliseconds, but it won't print the numbers emitted before its subscription.

What if we want to receive all the notifications that have been emitted before our subscription and then to continue receiving the incoming ones? That can be accomplished by calling the replay() method instead of the publish() method. It creates a ConnectableObservable instance from the source Observable instance with this little twist: all the subscribers, whenever they subscribe, will receive all the notifications (the previous notifications will arrive in order and synchronously).

There is a way to activate an Observable instance to become hot without calling the connect() method. It can be activated on the first subscription to it and deactivated when every Subscriber instance unsubscribes. Such an Observable instance can be created from a ConnectableObservable instance by calling the refCount() method on it (the name of the method comes from 'reference count'; it counts the Subscriber instances subscribed to the Observable instance created by it). Here is the preceding example implemented using the refCount() method:

Observable<Long> refCount = interval.publish().refCount();
Subscription sub1 = subscribePrint(refCount, "First");
Subscription sub2 = subscribePrint(refCount, "Second");
try {
  Thread.sleep(300L);
}
catch (InterruptedException e) {}
sub1.unsubscribe();
sub2.unsubscribe();
Subscription sub3 = subscribePrint(refCount, "Third");
try {
  Thread.sleep(300L);
}
catch (InterruptedException e) { }
sub3.unsubscribe();

The Observable instance will be deactivated when sub2 unsubscribes. If someone subscribes to it after that, it will begin emitting the sequence from the beginning. This is what's happening with sub3. There is a share() method, which is an alias for the publish().refCount() call.

There is one other way to create a hot Observable: using a Subject instance. We will introduce them in the next and last section of this chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.111.208