The conditional operators

It is possible to make it so that one Observable instance won't begin its emissions until another emits, or so that it would emit only if another doesn't emit anything. These Observable instances are able to emit items under given conditions, and these conditions are applied to them using conditional operators. In this section, we'll take a look at some of the conditional operators provided by RxJava.

The amb operator

The amb() operator has overloads that take from two up to nine source Observable instances or an Iterable instance of the Observable instances. It emits the items of the source Observable instance that starts emitting first. It doesn't matter what this is, whether OnError, OnCompleted notification, or data. Its diagram looks like this:

The amb operator

This operator has an instance form too. It is called ambWith() and can be called on one Observable instance in argument with another Observable instance.

This conditional operator is good for reading from multiple sources of similar data. The subscriber won't care about the origin of the data. It can be used to implement simple caching, for example. Here is a little example of how it may be used:

Observable<String> words = Observable.just("Some", "Other");
Observable<Long> interval = Observable
  .interval(500L, TimeUnit.MILLISECONDS)
  .take(2);
subscribePrint(Observable.amb(words, interval), "Amb 1");
Random r = new Random();
Observable<String> source1 = Observable
  .just("data from source 1")
  .delay(r.nextInt(1000), TimeUnit.MILLISECONDS);
Observable<String> source2 = Observable
  .just("data from source 2")
  .delay(r.nextInt(1000), TimeUnit.MILLISECONDS);
subscribePrint(Observable.amb(source1, source2), "Amb 2");

The first amb() operator will emit the items of the words Observable instance, because the interval Observable instance will have to wait for half a second before emitting, and the words will begin emitting immediately.

The emission of the second amb Observable instance will be decided at random. If the first source Observable instance emits its data before the second, its emission will be mirrored by the amb Observable instance, but if the second source emits first, the amb Observable instance will emit its data.

The takeUntil(), takeWhile(), skipUntil(), and skipWhile() conditional operators

We saw operators similar to these in the previous chapter. The take(int) operator filtered only the first n items. These operators also filter items, but based on conditions. The takeUntil() operator takes another Observable instance, and until this other Observable instance emits, the source's items are emitted; after that, the Observable instance created by the takeUntil() operator completes. Let's look at an example of using these operators:

Observable<String> words = Observable // (1)
  .just("one", "way", "or", "another", "I'll", "learn", "RxJava")
  .zipWith(
    Observable.interval(200L, TimeUnit.MILLISECONDS),
    (x, y) -> x
  );
Observable<Long> interval = Observable
  .interval(500L, TimeUnit.MILLISECONDS);
subscribePrint(words.takeUntil(interval), "takeUntil"); // (2)
subscribePrint( // (3)
  words.takeWhile(word -> word.length() > 2), "takeWhile"
);
subscribePrint(words.skipUntil(interval), "skipUntil"); // (4)

Let's take a look at the following explanation:

  1. For these examples, we'll use the words and interval Observable instances. The words Observable instance emits a word every 200 milliseconds, and the interval Observable emits every half a second.
  2. As mentioned previously, this overload of the takeUntil() operator will emit words until the interval Observable emits. So, the words one and way will be emitted because the next word, or, should be emitted 600 milliseconds after the subscription, and the interval Observable emits on the 500th millisecond.
  3. Here, the takeWhile() operator puts a condition on the words Observable. It will emit only while there are words that contain more than two letters. Because 'or' has two letters, it won't be emitted and all the words after it will be skipped too. The takeUntil() operator has a similar overload, but it emits only words containing fewer than three letters. There is no takeWhile(Observable) operator overload as it would be zip() operator essentially: emit only if the other emits too.
  4. The skip* operators are analogous to the take* ones. The difference is that they don't emit until/while a condition is satisfied. In this example, the words one and way are skipped because they are emitted before the 500th millisecond of subscribing and the interval Observable begins emitting at the 500th millisecond. The word 'or' and all the words coming after it are emitted.

These conditional operators can be used, for example, for displaying loading animation in GUI applications. The code can be something like this:

loadingAnimationObservable.takeUntil(requestObservable);

On every emission of the loadingAnimationObservable variable, some short-lived animation will be displayed to the user. When the request is returned, the animation will no longer be displayed. This is another way of branching the logic of a program.

The defaultIfEmpty( ) operator

The idea of the defaultlfEmpty() operator is to return something useful if an unknown source turns out to be empty. For example, we'll use locally stored information, if a remote source has nothing new.

Here's a simple example:

Observable<Object> test = Observable
  .empty()
  .defaultIfEmpty(5);
subscribePrint(test, "defaultIfEmpty");

Of course, this will output 5 and will complete.

Note

The source code for the amb(), take*, skip*, and defaultIfEmpty() operator examples can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter05/Conditionals.java.

Until now, we have transformed, filtered, and combined data. But what about the errors? Our applications can enter into error state at any time. Yes, we can subscribe for errors emitted by the Observable instances, but this will terminate our logic. In the subscribe method, we are outside of the Observable chain of operators. What if we want to react to an error from within the Observable instances chain and to try to prevent the termination? There are some operators that help us do that, and we'll be examining them in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.223.190