Let's look at the signature of the method first:
public final static <T> Observable<T> create(OnSubscribe<T>)
It takes a parameter of type OnSubscribe
. This interface extends the Action1<Subscriber<? super T>>
interface; in other words, this type has only one method, taking one argument of type Subscriber<T>
and returning nothing. This function will be called every time the Observable.subscribe()
method is invoked. Its argument, an instance of the Subscriber
class, is in fact the observer, subscribing to the Observable
instance (here, the Subscriber
class and Observer interface have the same role). We'll be talking about them later in this chapter). We can invoke the onNext()
, onError()
, and onCompleted()
methods on it, implementing our own custom behavior.
It's easier to comprehend this with an example. Let's implement a simple version of the Observable.from(Iterabale<T>)
method:
<T> Observable<T> fromIterable(final Iterable<T> iterable) { return Observable.create(new OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { try { Iterator<T> iterator = iterable.iterator(); // (1) while (iterator.hasNext()) { // (2) subscriber.onNext(iterator.next()); } subscriber.onCompleted(); // (3) } catch (Exception e) { subscriber.onError(e); // (4) } } }); }
The method takes an Iterable<T>
parameter as an argument and returns an Observable<T>
parameter. The behavior is as follows:
Observer/Subscriber
instance subscribes to the resulting Observable
instance, an Iterator
instance is retrieved from the Iterable
source. The Subscriber
class actually implements the Observer
interface. It is an abstract class, and the on*
methods are not implemented by it.OnNext
notifications.OnCompleted
notification is dispatched.OnError
notification is dispatched with the error.This is a very simple and naive implementation of the behavior of the Observable.from(Iterable<T>)
method. The Reactive Sum described in the first and second chapters is another example of the power of the Observable.create
method (used by CreateObservable.from()
).
But as we saw, the logic passed to the create()
method is triggered when the Observable.subscribe()
method is invoked on the Observable
instance. Until now, we were creating Observable
instances and subscribing to them with this method. It is time to look at it in detail.
18.223.239.226