The Observable.create method

Let's look at the signature of the method first:

public final static <T> Observable<T> create(OnSubscribe<T>)

It takes a parameter of type OnSubscribe. This interface extends the Action1<Subscriber<? super T>> interface; in other words, this type has only one method, taking one argument of type Subscriber<T> and returning nothing. This function will be called every time the Observable.subscribe() method is invoked. Its argument, an instance of the Subscriber class, is in fact the observer, subscribing to the Observable instance (here, the Subscriber class and Observer interface have the same role). We'll be talking about them later in this chapter). We can invoke the onNext(), onError(), and onCompleted() methods on it, implementing our own custom behavior.

It's easier to comprehend this with an example. Let's implement a simple version of the Observable.from(Iterabale<T>) method:

<T> Observable<T> fromIterable(final Iterable<T> iterable) {
  return Observable.create(new OnSubscribe<T>() {
    @Override
    public void call(Subscriber<? super T> subscriber) {
      try {
        Iterator<T> iterator = iterable.iterator(); // (1)
        while (iterator.hasNext()) { // (2)
          subscriber.onNext(iterator.next());
        }
        subscriber.onCompleted(); // (3)
      }
      catch (Exception e) {
        subscriber.onError(e); // (4)
      }
    }
  });
}

The method takes an Iterable<T> parameter as an argument and returns an Observable<T> parameter. The behavior is as follows:

  1. When an Observer/Subscriber instance subscribes to the resulting Observable instance, an Iterator instance is retrieved from the Iterable source. The Subscriber class actually implements the Observer interface. It is an abstract class, and the on* methods are not implemented by it.
  2. While there are elements, they are emitted as OnNext notifications.
  3. And when all the elements are emitted, an OnCompleted notification is dispatched.
  4. If at any time an error occurs, an OnError notification is dispatched with the error.

This is a very simple and naive implementation of the behavior of the Observable.from(Iterable<T>) method. The Reactive Sum described in the first and second chapters is another example of the power of the Observable.create method (used by CreateObservable.from()).

But as we saw, the logic passed to the create() method is triggered when the Observable.subscribe() method is invoked on the Observable instance. Until now, we were creating Observable instances and subscribing to them with this method. It is time to look at it in detail.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.45.80