It is possible to make it so that one Observable
instance won't begin its emissions until another emits, or so that it would emit only if another doesn't emit anything. These Observable
instances are able to emit items under given conditions, and these conditions are applied to them using conditional operators. In this section, we'll take a look at some of the conditional operators provided by RxJava.
The amb()
operator has overloads that take from two up to nine source Observable
instances or an Iterable
instance of the Observable
instances. It emits the items of the source Observable
instance that starts emitting first. It doesn't matter what this is, whether OnError
, OnCompleted
notification, or data. Its diagram looks like this:
This operator has an instance form too. It is called ambWith()
and can be called on one Observable
instance in argument with another Observable
instance.
This conditional operator is good for reading from multiple sources of similar data. The subscriber won't care about the origin of the data. It can be used to implement simple caching, for example. Here is a little example of how it may be used:
Observable<String> words = Observable.just("Some", "Other"); Observable<Long> interval = Observable .interval(500L, TimeUnit.MILLISECONDS) .take(2); subscribePrint(Observable.amb(words, interval), "Amb 1"); Random r = new Random(); Observable<String> source1 = Observable .just("data from source 1") .delay(r.nextInt(1000), TimeUnit.MILLISECONDS); Observable<String> source2 = Observable .just("data from source 2") .delay(r.nextInt(1000), TimeUnit.MILLISECONDS); subscribePrint(Observable.amb(source1, source2), "Amb 2");
The first amb()
operator will emit the items of the words Observable
instance, because the interval Observable
instance will have to wait for half a second before emitting, and the words will begin emitting immediately.
The emission of the second amb Observable
instance will be decided at random. If the first source Observable
instance emits its data before the second, its emission will be mirrored by the amb Observable
instance, but if the second source emits first, the amb Observable
instance will emit its data.
We saw operators similar to these in the previous chapter. The take(int)
operator filtered only the first n items. These operators also filter items, but based on conditions. The takeUntil()
operator takes another Observable
instance, and until this other Observable
instance emits, the source's items are emitted; after that, the Observable
instance created by the takeUntil()
operator completes. Let's look at an example of using these operators:
Observable<String> words = Observable // (1) .just("one", "way", "or", "another", "I'll", "learn", "RxJava") .zipWith( Observable.interval(200L, TimeUnit.MILLISECONDS), (x, y) -> x ); Observable<Long> interval = Observable .interval(500L, TimeUnit.MILLISECONDS); subscribePrint(words.takeUntil(interval), "takeUntil"); // (2) subscribePrint( // (3) words.takeWhile(word -> word.length() > 2), "takeWhile" ); subscribePrint(words.skipUntil(interval), "skipUntil"); // (4)
Let's take a look at the following explanation:
Observable
instances. The words Observable
instance emits a word every 200 milliseconds, and the interval Observable
emits every half a second.takeUntil()
operator will emit words until the interval Observable
emits. So, the words one
and way
will be emitted because the next word, or
, should be emitted 600 milliseconds after the subscription, and the interval Observable
emits on the 500th millisecond.takeWhile()
operator puts a condition on the words Observable
. It will emit only while there are words that contain more than two letters. Because 'or'
has two letters, it won't be emitted and all the words after it will be skipped too. The takeUntil()
operator has a similar overload, but it emits only words containing fewer than three letters. There is no takeWhile(Observable)
operator overload as it would be zip()
operator essentially: emit only if the other emits too.skip*
operators are analogous to the take*
ones. The difference is that they don't emit until/while a condition is satisfied. In this example, the words one
and way
are skipped because they are emitted before the 500th millisecond of subscribing and the interval Observable
begins emitting at the 500th millisecond. The word 'or'
and all the words coming after it are emitted.These conditional operators can be used, for example, for displaying loading animation in GUI applications. The code can be something like this:
loadingAnimationObservable.takeUntil(requestObservable);
On every emission of the loadingAnimationObservable
variable, some short-lived animation will be displayed to the user. When the request is returned, the animation will no longer be displayed. This is another way of branching the logic of a program.
The idea of the defaultlfEmpty()
operator is to return something useful if an unknown source turns out to be empty. For example, we'll use locally stored information, if a remote source has nothing new.
Here's a simple example:
Observable<Object> test = Observable
.empty()
.defaultIfEmpty(5);
subscribePrint(test, "defaultIfEmpty");
Of course, this will output 5
and will complete.
The source code for the amb()
, take*
, skip*
, and defaultIfEmpty()
operator examples can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter05/Conditionals.java.
Until now, we have transformed, filtered, and combined data. But what about the errors? Our applications can enter into error state at any time. Yes, we can subscribe for errors emitted by the Observable
instances, but this will terminate our logic. In the subscribe
method, we are outside of the Observable
chain of operators. What if we want to react to an error from within the Observable
instances chain and to try to prevent the termination? There are some operators that help us do that, and we'll be examining them in the next section.
18.118.37.154