Chapter 5. Combinators, Conditionals, and Error Handling

Most of the programs that we write handle data from different sources. These sources can be both external (files, databases, servers, and many others) and internal (different collections or branches of the same external source). There are many cases in which we'll want to have these sources depend on each other in one way or another. Defining these dependencies is a necessary step in building our programs. The idea of this chapter is to introduce the Observable operators capable of that.

We saw an example of combined Observable instances in the first and second chapters. Our "Reactive Sum" program had one external data source—the user input but it branched it into two internal data sources, depending on the custom format. We saw how we can use the filter() operator instead of procedural if-else constructions. Later, we combined these data flows into one, with the help of a combinator.

We'll learn how to react to errors from inside the Observable instance chain. Remember, being able to react to failures makes our programs resilient.

In this chapter we will cover:

  • Combining the Observable instances using operators such as combineLatest(), merge(), concat(), and zip()
  • Creating dependencies between the Observable instances using conditional operators such as takeUntil(), skipUntil(), and amb()
  • Error handling using operators such as retry(), onErrorResumeNext(), and onErrorReturn()

Combining the Observable instances

We'll first look at the zip(Observable, Observable, <Observable>..., Func) operator, which can combine two or more Observable instances using a combining function.

The zip operator

The function passed to the zip operator has as many parameters as the number of the Observable instances passed to the zip() method. When all of these Observable instances emit at least one item, the function is called with the parameter values first emitted by each of the Observable instances. Its result will be the first emitted item by the Observable instance created via the zip() method. The second item emitted by this Observable instance will be a combination (computed using the function parameter of the zip() method) of the second items of the source Observable instances. Even if one of the source Observable instances has emitted three or more items, its second emitted item is used. The resulting Observable instance always emits the same number of items as the source Observable instance, which emits the fewest items and then completes.

This behavior can be better seen in this marble diagram:

The zip operator

Here is a very simple example of using the zip() method:

Observable<Integer> zip = Observable
.zip(
  Observable.just(1, 3, 4),
  Observable.just(5, 2, 6),
  (a, b) -> a + b
);
subscribePrint(zip, "Simple zip");

The example is similar to the marble diagram and outputs the same result. The first item emitted by the Observable instance created by the zip() method is emitted by the time all of the sources have emitted at least one item. This means that even if one of the sources emits all of its items, the result will be emitted only when all of the other sources emit items.

Now if you remember the interval() operator from Chapter 3, Creating and Connecting Observables, Observers, and Subjects, it is able to create an Observable instance emitting sequential numbers every <n> milliseconds. What if you want to emit a sequence of, say, arbitrary objects instead? This is possible by combining the interval() and from() or just() methods using the zip() method. Let's look at an example of that:

Observable<String> timedZip = Observable
.zip(
  Observable.from(Arrays.asList("Z", "I", "P", "P")),
  Observable.interval(300L, TimeUnit.MILLISECONDS),
  (value, i) -> value
);
subscribePrint(timedZip, "Timed zip");

This will output Z after 300 milliseconds, I after another 300 milliseconds, P after the same interval , and another P after 300 more milliseconds. After that, the timedZip Observable instance will complete. That's because the source Observable instance, created via the interval() method emits, its element every 300 milliseconds, and it determines the speed of the timedZip parameter emissions.

The zip() method has an instance method version too. The operator is called zipWith(). Here is an analogous example to the preceding one but using the zipWith() operator:

Observable<String> timedZip = Observable
.from(Arrays.asList("Z", "I", "P", "P"))
.zipWith(
  Observable.interval(300L, TimeUnit.MILLISECONDS),
  (value, skip) -> value
);
subscribePrint(timedZip, "Timed zip");

Next, we'll get to know the combinator we first saw in Chapter 1, An Introduction to Reactive Programming, while implementing 'The Reactive Sum'.

The combineLatest operator

The combineLatest() operator has the same parameters and overloads as the zip() operator but behaves a bit differently. The Observable instance it creates emits the first item as soon as there is at least one of each source, taking the last of each. After that, the Observable instance it creates emits an item whenever any of the source Observable instances emits an item. The number of items emitted by the combineLatest() operator depends entirely on the order of items emitted, since multiple items could be emitted from a single source before there is one of each source. Its marble diagram looks like this:

The combineLatest operator

In the preceding diagram, the color of the items emitted by the combining Observable instance are the same as the colors of those items triggering their emission.

In the next few examples, will be using three source Observable instances, created by the interval() and zipWith() methods:

Observable<String> greetings = Observable
.just("Hello", "Hi", "Howdy", "Zdravei", "Yo", "Good to see ya")
.zipWith(
  Observable.interval(1L, TimeUnit.SECONDS),
  this::onlyFirstArg
);
Observable<String> names = Observable
.just("Meddle", "Tanya", "Dali", "Joshua")
.zipWith(
  Observable.interval(1500L, TimeUnit.MILLISECONDS),
  this::onlyFirstArg
);
Observable<String> punctuation = Observable
.just(".", "?", "!", "!!!", "...")
.zipWith(
  Observable.interval(1100L, TimeUnit.MILLISECONDS),
  this::onlyFirstArg
);

This is the function used for zipping:

public <T, R> T onlyFirstArg(T arg1, R arg2) {
  return arg1;
}

This is the same method of inserting delays between emissions as seen in the section about the zip() method. These three Observable instances can be used to compare the different combining methods. The Observable instance containing greetings emits every second, the one containing names emits every second and a half, and the one with punctuation signs every 1.1 seconds.

Using the combineLatest() operator, we can combine them like this:

Observable<String> combined = Observable
.combineLatest(
  greetings, names, punctuation,
  (greeting, name, puntuation) ->
    greeting + " " + name + puntuation)
;
subscribePrint(combined, "Sentences");

This will combine the different source items in sentences. The first sentence will be emitted after a second and a half because all the sources have to emit something in order for the combined Observable instance to start its emissions. This sentence will be 'Hello Meddle.'. The next sentence will be emitted the moment any of the sources emits something. This will happen two seconds after subscribing, because the greetings Observable instance emits every second; it will emit 'Hi', and this will make the combined Observable instance emit 'Hi Meddle.'. When 2.2 seconds pass, the punctuation Observable instance will emit '?', so we'll have another sentence—'Hi Meddle?'. This will continue until all of the sources are completed.

The combineLatest() operator is very useful when we need computation or notification when any of the data sources we depend on changes. The next method is simpler; it just merges the emissions of its sources, interleaving them in the process.

The merge operator

When we want to get feeds from multiple sources as one stream, we can use the merge() operator. For example, we can have many Observable instances emitting data from different log files. We don't care which log file is the source of the current emission, we just want to see all the logs.

The diagram of the merge() operator is pretty simple:

The merge operator

Every item is emitted at its original emission time, and the source doesn't matter. An example using the three Observable instances introduced in the previous section looks like this:

Observable<String> merged = Observable
  .merge(greetings, names, punctuation);
subscribePrint(merged, "Words");

It just emits different words/punctuation signs. The first word emitted will come from the greetings Observable instance, one second after the subscription (because greetings emits every second) 'Hello'; then '.' will be emitted after 100 milliseconds because the punctuation Observable instance emits its items every 1.1 seconds. After 400 milliseconds, one second and a half after the subscription, 'Meddle' will be emitted. Next is the greeting 'Hi'. Emissions will continue to take place until the source Observable instance, which takes the most time, completes.

It is worth mentioning that if any of the sources emits an OnError notification, the merge Observable instance emits the error too and completes with it. There is a form of merge() operator that delays emitting errors until all the error-free source Observable instances are completed. It is called mergeDelayError().

If we want to combine our sources in such a way that their items don't interleave in time and the emissions of the first passed source take precedence over the next one, we will be using the last combinator that this chapter introduces—the concat() operator.

The concat operator

All of the chapters in this book are in different files. We want to concatenate the content of all of these files into one big file, representing the whole book. We can create an Observable instance for each chapter-file with the from(Path) method that we created earlier, and we can use the concat() operator with these Observable instances as sources to concatenate them in the right order in one Observable instance. If we subscribe to this Observable instance with a method that writes everything in a file, we'll have our book-file in the end.

Note that the conact() operator won't work well with infinite Observable instances. It will emit the notifications of the first one, but it will block the others. The main difference between the merge() and concat() operators is that merge()subscribes to all source Observable instances at the same time, whereas concat()has exactly one subscription at any time.

The marble diagram of the concat() operator looks like this:

The concat operator

Here is an example of concatenating the three Observable instances from the previous examples:

Observable<String> concat = Observable
  .concat(greetings, names, punctuation);
subscribePrint(concat, "Concat");

This will output all of the greetings, one by one, every second, then the names every second and a half, and finally the punctuation signs every 1.1 seconds. Between the greetings and the names there will be a second and a half.

There is one operator, similar to the concat() operator, called startWith(). It prepends items to an Observable instance and has overloads that take one, two, three, and so on, up to nine values, with an Iterable instance or another Observable instance. Using the overload taking another Observable instance as a parameter, we can simulate the concat() operator. Here is the preceding example implemented in the following code:

Observable<String> concat = punctuation
  .startWith(names)
  .startWith(greetings);
subscribePrint(concat, "Concatenated");

The greetings Observable instance is prepended to the names one, and the result of this is prepended to the punctuation of the Observable instance, creating the same Observable instance of concatenated sources as in the preceding example.

Note

The source code for the preceding and all the previous examples in this chapter can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter05/CombiningObservables.java.

Good use of the startWith() operator is when it is used with the combineLatest() operator. If you remember the initial implementation of our 'Reactive Sum' example, you had to enter both the a and b values in order to calculate the initial sum. But suppose we modify the construction of the sum like this:

Observable.combineLatest(
  a.startWith(0.0),
  b.startWith(0.0),
  (x, y) -> x + y
);

We will have an initial sum of 0.0 even before the user has entered anything and the situation in which the user has entered a for the first time and not yet gave value to b in which case we don't see the sum won't occur.

Again, like with the merge() operator, the concat() operator has an instance form—the concatWith() operator.

In this section of the chapter, we saw how we can combine different Observable instances. But combining is not the only interaction between the Observable instances. They can depend on each other or manage each other. There is a way of getting one or more Observable instances to create conditions changing the behavior of other Observable instances. It's achieved through conditional operator/operators.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.223.10