Subscribing and unsubscribing

The Observable.subscribe() method has many overloads as follows:

  • subscribe(): This one ignores all the emissions from the Observable instance and throws an OnErrorNotImplementedException exception if there is an OnError notification. This can be used to only trigger the OnSubscribe.call behavior.
  • subscribe(Action1<? super T>): This only subscribes to onNext() method-triggered updates. It ignores the OnCompleted notification and throws an OnErrorNotImplementedException exception if there is an OnError notification. It is not a good choice for real production code, because it is hard to guarantee that no errors will be thrown.
  • subscribe(Action1<? super T>, Action1<Throwable>): This is the same as preceding one, but the second parameter is called if there is an OnError notification.
  • subscribe(Action1<? super T>,Action1<Throwable>, Action0): This is the same as the preceding one, but the third parameter is called on OnCompleted notification.
  • subscribe(Observer<? super T>): This uses its Observer parameter's onNext/onError/onCompleted methods to observe the notifications from the Observable instance. We used this in the first chapter while implementing "The Reactive Sum".
  • subscribe(Subscriber<? super T>): This is the same as the preceding one, but the Subscriber implementation of the Observer interface is used to observe notifications. The Subscriber class provides advanced functionality, such as unsubscription (cancellation) and backpressure (flow control). Actually, all the preceding methods call this one; that's why we will be referring to it when talking about Observable.subscribe from now on. The method ensures that the Subscriber instance passed sees an Observable instance, complying with the following Rx contract:

    "Messages sent to instances of the Observer interface follow the following syntax:

    onNext* (onCompleted | onError)?

    This syntax allows observable sequences to send any number (0 or more) of OnNext() method messages to the Subscriber, optionally followed by a single success (onCompleted) or failure (onError) message. The single message indicating that an observable sequence has finished ensures that consumers of the observable sequence can deterministically establish that it is safe to perform cleanup operations. A single failure further ensures that abort semantics can be maintained for operators that work on multiple observable sequences".

    – part of RxJava's JavaDoc.

    This is done internally by using a wrapper around the passed Subscriber instance—SafeSubscriber.

  • unsafeSubscribe(Subscriber<? super T>): This is the same as the preceding one but without the Rx contract protection. It is meant to help implement custom operators (see Chapter 8, Resource Management and Extending RxJava) without the additional overhead of the subscribe() method's protections; using this method to observe an Observable instance in general code is discouraged.

All of these methods return results of type Subscription that can be used for unsubscribing from the notifications emitted by the Observable instance. Unsubscribing usually cleans up internal resources associated with a subscription; for example, if we implement an HTTP request with the Observable.create() method and want to cancel it by a particular time, or we have an Observable instance emitting a sequence of numbers/words/arbitrary data infinitely and want to stop that.

The Subscription interface has two methods:

  • void unsubscribe(): This is used for unsubscribing.
  • boolean isUnsubscribed(): This is used to check whether the Subscription instance is already unsubscribed.

The instance of the Subscriber class, passed to the Observable.create() method's OnSubscribe() method, implements the Subscription interface. So, while coding the behavior of the Observable instance, unsubscribing and checking whether Subscriber is subscribed can be done. Let's update our Observable<T> fromIterable(Iterable<T>) method implementation to react on unsubscribing:

<T> Observable<T> fromIterable(final Iterable<T> iterable) {
  return Observable.create(new OnSubscribe<T>() {
    @Override
    public void call(Subscriber<? super T> subscriber) {
      try {
        Iterator<T> iterator = iterable.iterator();
        while (iterator.hasNext()) {
          if (subscriber.isUnsubscribed()) {
            return;
          }
          subscriber.onNext(iterator.next());
        }
        if (!subscriber.isUnsubscribed()) {
          subscriber.onCompleted();
        }
      }
      catch (Exception e) {
        if (!subscriber.isUnsubscribed()) {
          subscriber.onError(e);
        }
      }
    }
  });
}

The new thing here is that the Subscription.isUnsubscribed() method is used to determine whether the data emission should be terminated. We check whether the Subscriber is already unsubscribed on every iteration, because it can unsubscribe at any time and we won't need to emit anything after that. After everything is emitted, if the Subscriber is already unsubscribed, the onCompleted() method is skipped. If there is an exception , it is only emitted as an OnError notification if the Subscriber instance is still subscribed.

Let's look at how unsubscribing works:

Path path = Paths.get("src", "main", "resources", "lorem_big.txt"); // (1)
List<String> data = Files.readAllLines(path);
Observable<String> observable = fromIterable(data).subscribeOn(Schedulers.computation()); // (2)
Subscription subscription = subscribePrint(observable, "File");// (3)
System.out.println("Before unsubscribe!");
System.out.println("-------------------");
subscription.unsubscribe(); // (4)
System.out.println("-------------------");
System.out.println("After unsubscribe!");

Here's what's happening in this example:

  1. The data source is a huge file because we need something that takes some time to be iterated.
  2. All the subscriptions to the Observable instance will take place on another thread because we will want to unsubscribe on the main thread.
  3. The subscribePrint() method defined in this chapter is used, but it is modified to return the Subscription.
  4. The subscription is used to unsubscribe from the Observable instance, so the whole file won't be printed and there are markers showing when the unsubscription is executed.

The output will be something like this:

File : Donec facilisis sollicitudin est non molestie.
File : Integer nec magna ac ex rhoncus imperdiet.
Before unsubscribe!
-------------------
File : Nullam pharetra iaculis sem.
-------------------
After unsubscribe!

So most of the file's content is skipped. Note that it is possible for something to be emitted right after unsubscribing; for example, if the Subscriber instance unsubscribes right after the check for unsubscribing and the program is already executing the body of the if statement, checking whether the user is unsubscribed.

Another thing to note is that the Subscriber instances have a void add(Subscription s) method. Every subscription passed to it will be automatically unsubscribed when the Subscriber is unsubscribed. This way, we can add additional actions to the Subscriber instance; for example, actions that should be executed at unsubscribing (similar to the try—finally construction in Java). This is how unsubscribing works. In Chapter 8, Resource Management and Extending RxJava, we'll be dealing with resource management. We'll learn how Observable instances can be attached to Subscriber instances through a Subscription wrapper, and how calling unsubscribe will release any allocated resources.

The next topic we'll be covering in this chapter is related to subscribing behavior. We will be talking about hot and cold Observable instances.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.119.81