Observer plus iterator equals Reactive Stream

In this chapter, we have talked a lot about the Observer pattern, which gives us a clearly separated view of the Producer event and Consumer event. Let's have a recap of the interfaces defined by that pattern, as shown in the following code:

public interface Observer<T> {
void notify(T event);
}

public interface Subject<T> {
void registerObserver(Observer<T> observer);
void unregisterObserver(Observer<T> observer);
void notifyObservers(T event);
}

As we saw previously, this approach is charming for infinite streams of data, but it would be great to have the ability to signal the end of the data stream. Also, we do not want the Producer to generate events before the appearance of consumers. In the synchronous world, we have a pattern for that—Iterator pattern. This may be described using the following code:

public interface Iterator<T> {
T next();
boolean hasNext();
}

To retrieve items one by one, Iterator provides the next() method and also makes it possible to signal the end of the sequence by returning a false value as a result of the hasNext() call. So what would happen if we tried to mix this idea with an asynchronous execution provided by the Observer pattern? The result would look like the following:

public interface RxObserver<T> {
void onNext(T next);
void onComplete();
}

The RxObserver is pretty similar to the Iterator, but instead of calling the next() method of Iterator, RxObserver would be notified with a new value by the onNext() callback. And instead of checking whether the result of the hasNext() method is positive, RxObserver is informed about the end of the stream through the invoked onComplete() method. That is fine, but what about errors? The Iterator may throw an Exception during the processing of the next() method, and it would be great to have a mechanism for an error propagation from the Producer to RxObserver. Let's add a special callback for that—onError(). So, the final solution will look like the following:

public interface RxObserver<T> {
void onNext(T next);
void onComplete();
void onError(Exception e);
}

This happened because we have just designed an Observer interface, the foundational concept of RxJava. This interface defines how data flows between every part of a reactive stream. By being the smallest part of the library, the Observer interface is found everywhere. The RxObserver is similar to the Observer from the Observer pattern, as previously described.

The Observable Reactive class is a counterpart to the Subject from the Observer pattern. As a consequence, Observable plays a role as an events source as it emits items. It has hundreds of stream transformation methods, as well as dozens of factory methods to initialize a reactive stream.

Subscriber abstract class implements the Observer interface and consumes items. It is also used as a base for the actual Subscriber's implementation. The runtime relation between Observable and Subscriber is controlled by a Subscription that makes it possible to check the subscription status and cancel it if needed. This relationship is illustrated in the following diagram:

Diagram 2.6 Observable-Observer contract

RxJava defines rules about emitting items. The Observable is allowed to send any number of elements (including zero). Then it signals the end of the execution either by claiming the success or raising an error. So the Observable for each attached Subscriber invokes onNext() any number of times, then calls onComplete() or onError() (but not both). Consequently, it is prohibited for it to call onNext() after onComplete() or onError().

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.8.8