In the RxJava world, we have four main players:
Observables and Subjects are the two "producing" entities. Observers and Subscribers are the two "consuming" entities.
When we have to execute something asynchronously with a lite level of complexity, Java provides classic classes, such as Thread
, Future
, FutureTask
, CompletableFuture
, to approach the problem. When the level of complexity goes up, these solutions tend to become messy and hard to maintain. Most of all, they cannot be chained.
RxJava Observables were designed to solve these issues. They are flexible and easy to use, they can be chained, and they can work on a single result routine or, even better, on sequences. Whenever you want to emit a single scalar value or a sequence of values, or even an infinite value stream, you can use an Observable.
The Observable life cycle contains three possible events that we can easily compare to Iterable life cycle events. The next table shows how the Observable async/push approach relates to the Iterable sync/pull one:
Event |
Iterable (pull) |
Observable (push) |
---|---|---|
Retrieve the data |
|
|
Discover the error |
throws |
|
Complete |
|
|
With Iterable, the consumer synchronously pulls values from the producer and the thread is blocked until these values arrive. By contrast, with Observable, the producer asynchronously pushes values to the Observer whenever values are available. This approach is more flexible because even if values arrive synchronously or asynchronously, the consumer can act according to expectations in both scenarios.
To properly replicate the same Iterable interface, the RxJava Observable class enhances the base semantic of the Observer pattern by the Gang of Four, and introduces two new abilities:
Keeping in mind these two abilities and the preceding table, we know that Observables and Iterables have a comparable abilities set but a different data flow direction: push versus pull.
From an emission point of view, there are two different types of Observables: hot and cold. A hot Observable, typically, starts emitting items as soon as it is created, so any Observer who subscribes to this Observable may start observing the sequence somewhere in the middle. A cold Observable waits until an Observer subscribes to it before it begins to emit items, so such an Observer is guaranteed to see the whole sequence from the beginning.
The Observable class provides the ways discussed in the upcoming sections to create an Observable.
The create()
method gives the developer the ability to create an Observable from scratch. It takes an OnSubscribe
object, which extends Action1
, as a parameter and executes the call()
function when an Observer subscribes to our Observable:
Observable .create(new Observable.OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { } });
Downloading the example code
You can download the example code files from your account at http://www.packtpub.com for all the Packt Publishing books you have purchased. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
Observable
communicates with the Observer using the subscriber
variable and by calling its methods according to the conditions. Let's see a real-world example:
Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
for (int i = 0; i < 5
; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
The example is kept simple on purpose, because even if it's the first time you see RxJava in action, I want you to be able to figure out what's going on.
We created a new Observable<Integer>
item that executes a for
loop on five elements, emitting them one by one, and then it completes.
On the other side, we subscribe to the Observable, obtaining a Subscription
object. The moment we subscribe, we start to receive integers and we print them one by one. We don't know how many integers we will receive. Indeed, we don't need to know because we provided behaviors for every possible scenario:
In the previous example, we created a sequence of integers and emitted them one by one. What if we already have a list? Can we save the for
loop but still emit items one by one?
In the following code example, we create an Observable sequence from a list we already had:
List<Integer> items = new ArrayList<Integer>(); items.add(1); items.add(10); items.add(100); items.add(200); Observable<Integer> observableString = Observable.from(items); Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("Observable completed"); } @Override public void onError(Throwable e) { System.out.println("Oh no! Something wrong happened!"); } @Override public void onNext(Integer item) { System.out.println("Item is " + item); } });
The output is absolutely the same as that in the previous example.
The from()
creator can create an Observable from a list/array, emitting every object contained in the list/array one by one or from a Java Future
class, emitting the result value of the .get()
method of the Future
object. Passing Future
as parameter, we can even specify a timeout value. The Observable will wait for a result from Future
; if no result comes before the timeout, the Observable will fire the onError()
method and notify the Observer that something was wrong.
What if we already have a classic Java function and we want to transform it in an Observable? We could use the create()
method, as we already saw, or we can avoid a lot of boilerplate code using something like this:
Observable<String> observableString = Observable.just(helloWorld()); Subscription subscriptionPrint = observableString.subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("Observable completed"); } @Override public void onError(Throwable e) { System.out.println("Oh no! Something wrong happened!"); } @Override public void onNext(String message) { System.out.println(message); } });
The helloworld()
function is super easy, like this:
private String helloWorld() { return "Hello World"; }
However, this could be any function we want. In the previous example, the moment we create the Observable, just()
executes the function and when we subscribe to our Observable, it emits the returned value.
The just()
creator can take between one and nine parameters. It will emit them in the same order as they are given as parameters. The just()
creator also accepts lists or arrays, like from()
, but it won't iterate on the list, emitting every value, it will just emit the whole list. Usually, it's used when we want to emit a defined set of values, but if our function is not time-variant, we can use just()
to have a more organized and testable code base.
As a final note about the just()
creator, after emitting the value, the Observable terminates normally. For the previous example, we will have two messages on the system console: "Hello World"
and "Observable completed"
.
A subject is a magic object that can be an Observable and an Observer at the same time: it acts as a bridge connecting the two worlds. A subject can subscribe to an Observable, acting like an Observer, and it can emit new items or even pass through the item it received, acting like an Observable. Obviously, being an Observable, Observers or other subjects can subscribe to it.
The moment the subject subscribes to the Observable, it will trigger the Observable to begin emitting. If the original Observable is cold, this can have the effect of making the resulting subject a hot Observable variant of the original cold Observable.
RxJava provides four different types of subjects:
PublishSubject
BehaviorSubject
ReplaySubject
AsyncSubject
PublishSubject
is the basic Subject
object. Let's see our classic Observable Hello World achieved with a PublishSubject
:
PublishSubject<String> stringPublishSubject = PublishSubject.create(); Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("Observable completed"); } @Override public void onError(Throwable e) { System.out.println("Oh no! Something wrong happened!"); } @Override public void onNext(String message) { System.out.println(message); } }); stringPublishSubject.onNext("Hello World");
In the previous example, we created a new PublishSubject
, emitting a String
value with its create()
method, and then we subscribed to PublishSubject
. At this point, no item has been emitted yet, so our Observer is just waiting, without blocking or consuming resources. It's just there, ready to receive values from the subject. Our Observer would wait forever if the subject never emits a value. Once again, no worries: the Observer knows what to do in every scenario. The when does not concern us because this is reactive: the system will react. We don't care when it will react. We only care what's going to happen when it reacts.
The last line of code shows the manual emission of our "Hello World"
string, which triggers the Observer onNext()
method and lets us print the "Hello World"
message to the console.
Let's see a more complex example. Say we have a private
Observable and it's not accessible from the outside. This Observable emits values during its lifetime. We don't really care about these values, we only care about their termination.
First of all, we create a new PublishSubject
that reacts on its onNext()
method and is accessible from the outside world:
final PublishSubject<Boolean> subject = PublishSubject.create(); subject.subscribe(new Observer<Boolean>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Boolean completed) { System.out.println("Observable completed!"); } });
Then, we create the private
Observable, which is only accessible to the subject:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } }).doOnCompleted(new Action0() { @Override public void call() { subject.onNext(true); } }).subscribe();
The Observable.create()
method contains our familiar for
loop, emitting numbers. The doOnCompleted()
method specifies what is going to happen when the Observable terminates: emit true
on the subject. Finally, we subscribe to the Observable. Obviously, the empty subscribe()
call just starts the Observable, ignoring any emitted value, completed or error event. We need it like this for the sake of the example.
In this example, we created an entity that can be connected to Observables and can be observed at the same time. This is extremely useful when we want to create separation, abstraction, or a more observable point for a common resource.
Basically, BehaviorSubject
is a subject that emits the most recent item it has observed and all subsequent observed items to each subscribed item:
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
In this short example, we are creating a BehaviorSubject
that emits Integer
. BehaviorSubject
needs an initial value due to the fact that it's going to emit the most recent value the moment an Observe subscribes.
18.221.117.214