Integrating with Emitter API

The Emitter interface is a much more powerful construct because it allows you to control the way items will be emitted into the Observable in a very granular fashion. Basically, the Observable will be controlled by these three methods:

  • onNext(): This is to supply a new value to the Observable
  • onError(): This is to notify about an error or exception that has occurred internally
  • onComplete(): This is to notify an Observable that there won't be any new values, and it can safely terminate

The Emitter interface is supplied to an Observable during its creation with the .create() method where the full type interface looks like this:

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws
Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});

However, since we are using lambdas, it will usually look like this:

Observable.<Integer>create(e -> {
e.onNext(1);
e.onNext(2);
e.onComplete();
})

It is important not to forget to call the following whenever we are done supplying values:

e.onComplete();

Otherwise, the Observable won't terminate, and it will be needed to be disposed of manually.

The onComplete() call can be easily missed if exceptions aren't handled properly during the emission of the values. Consider the following example:

Observable.<Integer>create(e -> {
e.onNext(returnValue());
e.onComplete();
});

Here, returnValue() can throw an exception. If an exception is thrown, onComplete() won't ever be called. This can be fixed by adding a finally block, as shown:

Observable.<Integer>create(e -> {
try {
e.onNext(returnValue());
} catch (Exception ex) {
e.onError(ex);
} finally {
e.onComplete();
}
});

This way, onComplete()will never be missed.

The Emitter interface is available for all reactive types, such as Flowable, Observable, Single, Maybe, and Completable. It is used in the same fashion:

Flowable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}, BackpressureStrategy.BUFFER);

The Emitter interface is a powerful tool and can be cumbersome to use in some cases, so Callables and .fromCallable() might be a preferred option in some cases if the flow is simple. However, it is almost always mandatory to use .create() whenever multiple values will be returned and in cases where we need to receive values from some external Listener.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.61.170