The Observable.subscribe()
method has many overloads as follows:
subscribe()
: This one ignores all the emissions from the Observable
instance and throws an OnErrorNotImplementedException
exception if there is an OnError
notification. This can be used to only trigger the OnSubscribe.call
behavior.subscribe(Action1<? super T>)
: This only subscribes to onNext()
method-triggered updates. It ignores the OnCompleted
notification and throws an OnErrorNotImplementedException
exception if there is an OnError
notification. It is not a good choice for real production code, because it is hard to guarantee that no errors will be thrown.subscribe(Action1<? super T>, Action1<Throwable>)
: This is the same as preceding one, but the second parameter is called if there is an OnError
notification.subscribe(Action1<? super T>,Action1<Throwable>, Action0)
: This is the same as the preceding one, but the third parameter is called on OnCompleted
notification.subscribe(Observer<? super T>)
: This uses its Observer
parameter's onNext/onError/onCompleted
methods to observe the notifications from the Observable
instance. We used this in the first chapter while implementing "The Reactive Sum".subscribe(Subscriber<? super T>)
: This is the same as the preceding one, but the Subscriber
implementation of the Observer
interface is used to observe notifications. The Subscriber
class provides advanced functionality, such as unsubscription (cancellation) and backpressure (flow control). Actually, all the preceding methods call this one; that's why we will be referring to it when talking about Observable.subscribe
from now on. The method ensures that the Subscriber
instance passed sees an Observable
instance, complying with the following Rx contract:"Messages sent to instances of the Observer interface follow the following syntax:
onNext* (onCompleted | onError)?
This syntax allows observable sequences to send any number (0 or more) of
OnNext()
method messages to the Subscriber, optionally followed by a single success (onCompleted
) or failure (onError
) message. The single message indicating that an observable sequence has finished ensures that consumers of the observable sequence can deterministically establish that it is safe to perform cleanup operations. A single failure further ensures that abort semantics can be maintained for operators that work on multiple observable sequences".
– part of RxJava's JavaDoc.
This is done internally by using a wrapper around the passed Subscriber
instance—SafeSubscriber
.
unsafeSubscribe(Subscriber<? super T>)
: This is the same as the preceding one but without the Rx contract protection. It is meant to help implement custom operators (see Chapter 8, Resource Management and Extending RxJava) without the additional overhead of the subscribe()
method's protections; using this method to observe an Observable
instance in general code is discouraged.All of these methods return results of type Subscription
that can be used for unsubscribing from the notifications emitted by the Observable
instance. Unsubscribing usually cleans up internal resources associated with a subscription; for example, if we implement an HTTP request with the Observable.create()
method and want to cancel it by a particular time, or we have an Observable
instance emitting a sequence of numbers/words/arbitrary data infinitely and want to stop that.
The Subscription
interface has two methods:
void unsubscribe()
: This is used for unsubscribing.boolean isUnsubscribed()
: This is used to check whether the Subscription
instance is already unsubscribed.The instance of the Subscriber
class, passed to the Observable.create()
method's OnSubscribe()
method, implements the Subscription
interface. So, while coding the behavior of the Observable
instance, unsubscribing and checking whether Subscriber
is subscribed can be done. Let's update our Observable<T> fromIterable(Iterable<T>)
method implementation to react on unsubscribing:
<T> Observable<T> fromIterable(final Iterable<T> iterable) { return Observable.create(new OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { try { Iterator<T> iterator = iterable.iterator(); while (iterator.hasNext()) { if (subscriber.isUnsubscribed()) { return; } subscriber.onNext(iterator.next()); } if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } } catch (Exception e) { if (!subscriber.isUnsubscribed()) { subscriber.onError(e); } } } }); }
The new thing here is that the Subscription.isUnsubscribed()
method is used to determine whether the data emission should be terminated. We check whether the Subscriber
is already unsubscribed on every iteration, because it can unsubscribe at any time and we won't need to emit anything after that. After everything is emitted, if the Subscriber is already unsubscribed, the onCompleted()
method is skipped. If there is an exception , it is only emitted as an OnError
notification if the Subscriber
instance is still subscribed.
Let's look at how unsubscribing works:
Path path = Paths.get("src", "main", "resources", "lorem_big.txt"); // (1) List<String> data = Files.readAllLines(path); Observable<String> observable = fromIterable(data).subscribeOn(Schedulers.computation()); // (2) Subscription subscription = subscribePrint(observable, "File");// (3) System.out.println("Before unsubscribe!"); System.out.println("-------------------"); subscription.unsubscribe(); // (4) System.out.println("-------------------"); System.out.println("After unsubscribe!");
Here's what's happening in this example:
Observable
instance will take place on another thread because we will want to unsubscribe on the main thread.subscribePrint()
method defined in this chapter is used, but it is modified to return the Subscription
.Observable
instance, so the whole file won't be printed and there are markers showing when the unsubscription is executed.The output will be something like this:
File : Donec facilisis sollicitudin est non molestie. File : Integer nec magna ac ex rhoncus imperdiet. Before unsubscribe! ------------------- File : Nullam pharetra iaculis sem. ------------------- After unsubscribe!
So most of the file's content is skipped. Note that it is possible for something to be emitted right after unsubscribing; for example, if the Subscriber
instance unsubscribes right after the check for unsubscribing and the program is already executing the body of the if
statement, checking whether the user is unsubscribed.
The source code of the preceding example can be downloaded/viewed at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/ObservableCreateExample.java.
Another thing to note is that the Subscriber
instances have a void add(Subscription s)
method. Every subscription passed to it will be automatically unsubscribed when the Subscriber
is unsubscribed. This way, we can add additional actions to the Subscriber
instance; for example, actions that should be executed at unsubscribing (similar to the try—finally construction in Java). This is how unsubscribing works. In Chapter 8, Resource Management and Extending RxJava, we'll be dealing with resource management. We'll learn how Observable
instances can be attached to Subscriber
instances through a Subscription
wrapper, and how calling unsubscribe will release any allocated resources.
The next topic we'll be covering in this chapter is related to subscribing behavior. We will be talking about hot and cold Observable
instances.
18.188.119.81