The RxJava Observer pattern toolkit

In the RxJava world, we have four main players:

  • Observable
  • Observer
  • Subscriber
  • Subjects

Observables and Subjects are the two "producing" entities. Observers and Subscribers are the two "consuming" entities.

Observable

When we have to execute something asynchronously with a lite level of complexity, Java provides classic classes, such as Thread, Future, FutureTask, CompletableFuture, to approach the problem. When the level of complexity goes up, these solutions tend to become messy and hard to maintain. Most of all, they cannot be chained.

RxJava Observables were designed to solve these issues. They are flexible and easy to use, they can be chained, and they can work on a single result routine or, even better, on sequences. Whenever you want to emit a single scalar value or a sequence of values, or even an infinite value stream, you can use an Observable.

The Observable life cycle contains three possible events that we can easily compare to Iterable life cycle events. The next table shows how the Observable async/push approach relates to the Iterable sync/pull one:

Event

Iterable (pull)

Observable (push)

Retrieve the data

T next()

onNext(T)

Discover the error

throws Exception

onError(Throwable)

Complete

!hasNext()

onCompleted()

With Iterable, the consumer synchronously pulls values from the producer and the thread is blocked until these values arrive. By contrast, with Observable, the producer asynchronously pushes values to the Observer whenever values are available. This approach is more flexible because even if values arrive synchronously or asynchronously, the consumer can act according to expectations in both scenarios.

To properly replicate the same Iterable interface, the RxJava Observable class enhances the base semantic of the Observer pattern by the Gang of Four, and introduces two new abilities:

  • onCompleted(), that is, the ability to notify the Observer that there is no more data coming from the Observable
  • onError(), that is, the ability to notify the Observer that an error occurred

Keeping in mind these two abilities and the preceding table, we know that Observables and Iterables have a comparable abilities set but a different data flow direction: push versus pull.

Hot and cold Observables

From an emission point of view, there are two different types of Observables: hot and cold. A hot Observable, typically, starts emitting items as soon as it is created, so any Observer who subscribes to this Observable may start observing the sequence somewhere in the middle. A cold Observable waits until an Observer subscribes to it before it begins to emit items, so such an Observer is guaranteed to see the whole sequence from the beginning.

Creating an Observable

The Observable class provides the ways discussed in the upcoming sections to create an Observable.

Observable.create()

The create() method gives the developer the ability to create an Observable from scratch. It takes an OnSubscribe object, which extends Action1, as a parameter and executes the call() function when an Observer subscribes to our Observable:

Observable
        .create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {

            }
        });

Tip

Downloading the example code

You can download the example code files from your account at http://www.packtpub.com for all the Packt Publishing books you have purchased. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Observable communicates with the Observer using the subscriber variable and by calling its methods according to the conditions. Let's see a real-world example:

Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {

        for (int i = 0; i < 5; i++) {
            observer.onNext(i);
        }
        
        observer.onCompleted();
    }
});

Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh no! Something wrong happened!");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

The example is kept simple on purpose, because even if it's the first time you see RxJava in action, I want you to be able to figure out what's going on.

We created a new Observable<Integer> item that executes a for loop on five elements, emitting them one by one, and then it completes.

On the other side, we subscribe to the Observable, obtaining a Subscription object. The moment we subscribe, we start to receive integers and we print them one by one. We don't know how many integers we will receive. Indeed, we don't need to know because we provided behaviors for every possible scenario:

  • If we receive an integer, we print it
  • If the sequence ends, we print a closed sequence message
  • If an error occurs, we print an error message

Observable.from()

In the previous example, we created a sequence of integers and emitted them one by one. What if we already have a list? Can we save the for loop but still emit items one by one?

In the following code example, we create an Observable sequence from a list we already had:

List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);

Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println("Oh no! Something wrong happened!");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

The output is absolutely the same as that in the previous example.

The from() creator can create an Observable from a list/array, emitting every object contained in the list/array one by one or from a Java Future class, emitting the result value of the .get() method of the Future object. Passing Future as parameter, we can even specify a timeout value. The Observable will wait for a result from Future; if no result comes before the timeout, the Observable will fire the onError() method and notify the Observer that something was wrong.

Observable.just()

What if we already have a classic Java function and we want to transform it in an Observable? We could use the create() method, as we already saw, or we can avoid a lot of boilerplate code using something like this:

Observable<String> observableString = Observable.just(helloWorld());

Subscription subscriptionPrint = observableString.subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh no! Something wrong happened!");
    }

    @Override
    public void onNext(String message) {
        System.out.println(message);
    }
});

The helloworld() function is super easy, like this:

private String helloWorld() {
    return "Hello World";
}

However, this could be any function we want. In the previous example, the moment we create the Observable, just() executes the function and when we subscribe to our Observable, it emits the returned value.

The just() creator can take between one and nine parameters. It will emit them in the same order as they are given as parameters. The just() creator also accepts lists or arrays, like from(), but it won't iterate on the list, emitting every value, it will just emit the whole list. Usually, it's used when we want to emit a defined set of values, but if our function is not time-variant, we can use just() to have a more organized and testable code base.

As a final note about the just() creator, after emitting the value, the Observable terminates normally. For the previous example, we will have two messages on the system console: "Hello World" and "Observable completed".

Observable.empty(), Observable.never(), and Observable.throw()

If for any reason we need an Observable emitting nothing but terminating normally, we can use empty(). We can use never() to create an Observable emitting nothing and never terminating, and we can use throw() to create an Observable emitting nothing and terminating with an error.

Subject = Observable + Observer

A subject is a magic object that can be an Observable and an Observer at the same time: it acts as a bridge connecting the two worlds. A subject can subscribe to an Observable, acting like an Observer, and it can emit new items or even pass through the item it received, acting like an Observable. Obviously, being an Observable, Observers or other subjects can subscribe to it.

The moment the subject subscribes to the Observable, it will trigger the Observable to begin emitting. If the original Observable is cold, this can have the effect of making the resulting subject a hot Observable variant of the original cold Observable.

RxJava provides four different types of subjects:

  • PublishSubject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

PublishSubject

PublishSubject is the basic Subject object. Let's see our classic Observable Hello World achieved with a PublishSubject:

PublishSubject<String> stringPublishSubject = PublishSubject.create();

Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh no! Something wrong happened!");
    }

    @Override
    public void onNext(String message) {
        System.out.println(message);
    }
});

stringPublishSubject.onNext("Hello World");

In the previous example, we created a new PublishSubject, emitting a String value with its create() method, and then we subscribed to PublishSubject. At this point, no item has been emitted yet, so our Observer is just waiting, without blocking or consuming resources. It's just there, ready to receive values from the subject. Our Observer would wait forever if the subject never emits a value. Once again, no worries: the Observer knows what to do in every scenario. The when does not concern us because this is reactive: the system will react. We don't care when it will react. We only care what's going to happen when it reacts.

The last line of code shows the manual emission of our "Hello World" string, which triggers the Observer onNext() method and lets us print the "Hello World" message to the console.

Let's see a more complex example. Say we have a private Observable and it's not accessible from the outside. This Observable emits values during its lifetime. We don't really care about these values, we only care about their termination.

First of all, we create a new PublishSubject that reacts on its onNext() method and is accessible from the outside world:

final PublishSubject<Boolean> subject = PublishSubject.create();

subject.subscribe(new Observer<Boolean>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Boolean completed) {
        System.out.println("Observable completed!");
    }
});

Then, we create the private Observable, which is only accessible to the subject:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).doOnCompleted(new Action0() {
    @Override
    public void call() {
        subject.onNext(true);
    }
}).subscribe();

The Observable.create() method contains our familiar for loop, emitting numbers. The doOnCompleted() method specifies what is going to happen when the Observable terminates: emit true on the subject. Finally, we subscribe to the Observable. Obviously, the empty subscribe() call just starts the Observable, ignoring any emitted value, completed or error event. We need it like this for the sake of the example.

In this example, we created an entity that can be connected to Observables and can be observed at the same time. This is extremely useful when we want to create separation, abstraction, or a more observable point for a common resource.

BehaviorSubject

Basically, BehaviorSubject is a subject that emits the most recent item it has observed and all subsequent observed items to each subscribed item:

BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);

In this short example, we are creating a BehaviorSubject that emits Integer. BehaviorSubject needs an initial value due to the fact that it's going to emit the most recent value the moment an Observe subscribes.

ReplaySubject

ReplaySubject buffers all items it observes and replays them to any Observer that subscribes:

ReplaySubject<Integer> replaySubject = ReplaySubject.create();

AsyncSubject

AsyncSubject publishes only the last item observed to each Observer that has subscribed when the Observable completes:

AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.117.214