Most of the programs that we write handle data from different sources. These sources can be both external (files, databases, servers, and many others) and internal (different collections or branches of the same external source). There are many cases in which we'll want to have these sources depend on each other in one way or another. Defining these dependencies is a necessary step in building our programs. The idea of this chapter is to introduce the Observable
operators capable of that.
We saw an example of combined Observable
instances in the first and second chapters. Our "Reactive Sum" program had one external data source—the user input but it branched it into two internal data sources, depending on the custom format. We saw how we can use the filter()
operator instead of procedural if-else
constructions. Later, we combined these data flows into one, with the help of a combinator.
We'll learn how to react to errors from inside the Observable
instance chain. Remember, being able to react to failures makes our programs resilient.
In this chapter we will cover:
Observable
instances using operators such as combineLatest()
, merge()
, concat()
, and zip()
Observable
instances using conditional operators such as takeUntil()
, skipUntil()
, and amb()
retry()
, onErrorResumeNext()
, and onErrorReturn()
We'll first look at the zip(Observable, Observable, <Observable>..., Func)
operator, which can combine two or more Observable
instances using a combining function.
The function passed to the zip
operator has as many parameters as the number of the Observable
instances passed to the zip()
method. When all of these Observable
instances emit at least one item, the function is called with the parameter values first emitted by each of the Observable
instances. Its result will be the first emitted item by the Observable
instance created via the zip()
method. The second item emitted by this Observable
instance will be a combination (computed using the function parameter of the zip()
method) of the second items of the source Observable
instances. Even if one of the source Observable
instances has emitted three or more items, its second emitted item is used. The resulting Observable
instance always emits the same number of items as the source Observable
instance, which emits the fewest items and then completes.
This behavior can be better seen in this marble diagram:
Here is a very simple example of using the zip()
method:
Observable<Integer> zip = Observable .zip( Observable.just(1, 3, 4), Observable.just(5, 2, 6), (a, b) -> a + b ); subscribePrint(zip, "Simple zip");
The example is similar to the marble diagram and outputs the same result. The first item emitted by the Observable
instance created by the zip()
method is emitted by the time all of the sources have emitted at least one item. This means that even if one of the sources emits all of its items, the result will be emitted only when all of the other sources emit items.
Now if you remember the interval()
operator from Chapter 3, Creating and Connecting Observables, Observers, and Subjects, it is able to create an Observable
instance emitting sequential numbers every <n>
milliseconds. What if you want to emit a sequence of, say, arbitrary objects instead? This is possible by combining the interval()
and from()
or just()
methods using the zip()
method. Let's look at an example of that:
Observable<String> timedZip = Observable .zip( Observable.from(Arrays.asList("Z", "I", "P", "P")), Observable.interval(300L, TimeUnit.MILLISECONDS), (value, i) -> value ); subscribePrint(timedZip, "Timed zip");
This will output Z
after 300 milliseconds, I
after another 300 milliseconds, P
after the same interval , and another P
after 300
more milliseconds. After that, the timedZip
Observable
instance will complete. That's because the source Observable
instance, created via the interval()
method emits, its element every 300
milliseconds, and it determines the speed of the timedZip
parameter emissions.
The zip()
method has an instance method version too. The operator is called zipWith()
. Here is an analogous example to the preceding one but using the zipWith()
operator:
Observable<String> timedZip = Observable .from(Arrays.asList("Z", "I", "P", "P")) .zipWith( Observable.interval(300L, TimeUnit.MILLISECONDS), (value, skip) -> value ); subscribePrint(timedZip, "Timed zip");
Next, we'll get to know the combinator we first saw in Chapter 1, An Introduction to Reactive Programming, while implementing 'The Reactive Sum'.
The
combineLatest()
operator has the same parameters and overloads as the zip()
operator but behaves a bit differently. The Observable
instance it creates emits the first item as soon as there is at least one of each source, taking the last of each. After that, the Observable
instance it creates emits an item whenever any of the source Observable
instances emits an item. The number of items emitted by the combineLatest()
operator depends entirely on the order of items emitted, since multiple items could be emitted from a single source before there is one of each source. Its marble diagram looks like this:
In the preceding diagram, the color of the items emitted by the combining Observable
instance are the same as the colors of those items triggering their emission.
In the next few examples, will be using three source Observable
instances, created by the interval()
and zipWith()
methods:
Observable<String> greetings = Observable .just("Hello", "Hi", "Howdy", "Zdravei", "Yo", "Good to see ya") .zipWith( Observable.interval(1L, TimeUnit.SECONDS), this::onlyFirstArg ); Observable<String> names = Observable .just("Meddle", "Tanya", "Dali", "Joshua") .zipWith( Observable.interval(1500L, TimeUnit.MILLISECONDS), this::onlyFirstArg ); Observable<String> punctuation = Observable .just(".", "?", "!", "!!!", "...") .zipWith( Observable.interval(1100L, TimeUnit.MILLISECONDS), this::onlyFirstArg );
This is the function used for zipping:
public <T, R> T onlyFirstArg(T arg1, R arg2) { return arg1; }
This is the same method of inserting delays between emissions as seen in the section about the zip()
method. These three Observable
instances can be used to compare the different combining methods. The Observable
instance containing greetings emits every second, the one containing names emits every second and a half, and the one with punctuation signs every 1.1 seconds.
Using the combineLatest()
operator, we can combine them like this:
Observable<String> combined = Observable .combineLatest( greetings, names, punctuation, (greeting, name, puntuation) -> greeting + " " + name + puntuation) ; subscribePrint(combined, "Sentences");
This will combine the different source items in sentences. The first sentence will be emitted after a second and a half because all the sources have to emit something in order for the combined Observable
instance to start its emissions. This sentence will be 'Hello Meddle.'
. The next sentence will be emitted the moment any of the sources emits something. This will happen two seconds after subscribing, because the greetings Observable
instance emits every second; it will emit 'Hi'
, and this will make the combined Observable
instance emit 'Hi Meddle.'
. When 2.2 seconds pass, the punctuation Observable
instance will emit '?'
, so we'll have another sentence—'Hi Meddle?'
. This will continue until all of the sources are completed.
The combineLatest()
operator is very useful when we need computation or notification when any of the data sources we depend on changes. The next method is simpler; it just merges the emissions of its sources, interleaving them in the process.
When we want to get feeds from multiple sources as one stream, we can use the merge()
operator. For example, we can have many Observable
instances emitting data from different log
files. We don't care which log
file is the source of the current emission, we just want to see all the logs.
The diagram of the merge()
operator is pretty simple:
Every item is emitted at its original emission time, and the source doesn't matter. An example using the three Observable
instances introduced in the previous section looks like this:
Observable<String> merged = Observable
.merge(greetings, names, punctuation);
subscribePrint(merged, "Words");
It just emits different words/punctuation signs. The first word emitted will come from the greetings Observable
instance, one second after the subscription (because greetings emits every second) 'Hello'
; then '.'
will be emitted after 100 milliseconds because the punctuation Observable
instance emits its items every 1.1 seconds. After 400 milliseconds, one second and a half after the subscription, 'Meddle'
will be emitted. Next is the greeting 'Hi'
. Emissions will continue to take place until the source Observable
instance, which takes the most time, completes.
It is worth mentioning that if any of the sources emits an OnError
notification, the merge Observable
instance emits the error too and completes with it. There is a form of merge()
operator that delays emitting errors until all the error-free source Observable
instances are completed. It is called mergeDelayError()
.
If we want to combine our sources in such a way that their items don't interleave in time and the emissions of the first passed source take precedence over the next one, we will be using the last combinator that this chapter introduces—the concat()
operator.
All of the chapters in this book are in different files. We want to concatenate the content of all of these files into one big file, representing the whole book. We can create an Observable
instance for each chapter-file with the from(Path)
method that we created earlier, and we can use the concat()
operator with these Observable
instances as sources to concatenate them in the right order in one Observable
instance. If we subscribe to this Observable
instance with a method that writes everything in a file, we'll have our book-file in the end.
Note that the conact()
operator won't work well with infinite Observable
instances. It will emit the notifications of the first one, but it will block the others. The main difference between the merge()
and concat()
operators is that merge()
subscribes to all source Observable
instances at the same time, whereas concat()
has exactly one subscription at any time.
The marble diagram of the concat()
operator looks like this:
Here is an example of concatenating the three Observable
instances from the previous examples:
Observable<String> concat = Observable
.concat(greetings, names, punctuation);
subscribePrint(concat, "Concat");
This will output all of the greetings, one by one, every second, then the names every second and a half, and finally the punctuation signs every 1.1 seconds. Between the greetings and the names there will be a second and a half.
There is one operator, similar to the concat()
operator, called startWith()
. It prepends items to an Observable
instance and has overloads that take one, two, three, and so on, up to nine values, with an Iterable
instance or another Observable
instance. Using the overload taking another Observable
instance as a parameter, we can simulate the concat()
operator. Here is the preceding example implemented in the following code:
Observable<String> concat = punctuation .startWith(names) .startWith(greetings); subscribePrint(concat, "Concatenated");
The greetings Observable
instance is prepended to the names one, and the result of this is prepended to the punctuation of the Observable
instance, creating the same Observable
instance of concatenated sources as in the preceding example.
The source code for the preceding and all the previous examples in this chapter can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter05/CombiningObservables.java.
Good use of the startWith()
operator is when it is used with the combineLatest()
operator. If you remember the initial implementation of our 'Reactive Sum' example, you had to enter both the a
and b
values in order to calculate the initial sum. But suppose we modify the construction of the sum like this:
Observable.combineLatest( a.startWith(0.0), b.startWith(0.0), (x, y) -> x + y );
We will have an initial sum of 0.0
even before the user has entered anything and the situation in which the user has entered a
for the first time and not yet gave value to b
in which case we don't see the sum won't occur.
Again, like with the merge()
operator, the concat()
operator has an instance form—the concatWith()
operator.
In this section of the chapter, we saw how we can combine different Observable
instances. But combining is not the only interaction between the Observable
instances. They can depend on each other or manage each other. There is a way of getting one or more Observable
instances to create conditions changing the behavior of other Observable
instances. It's achieved through conditional operator/operators.
3.137.223.10