Zip

.zip() is an operation that combines items from two Observables into one by taking an item from each of the Observables and then producing a new value. It means that it waits for both of the Observables to produce a value before the flow continues.

This is useful in occasions where two values are produced independently but later (downstream), they are both consumed at the same time.

Let's see an example of that:

Observable.zip(
Observable.just("One", "Two", "Three"),
Observable.interval(1, TimeUnit.SECONDS),
(number, interval) -> number + "-" + interval
)
.subscribe(e -> log(e));

The first argument of .zip() is this:

Observable.just("One", "Two", "Three")

This is just a regular Observable that will produce three items. The second argument is 

Observable.interval(1, TimeUnit.SECONDS)

This is an Observable that will normally produce items endlessly. However, in this case, the behavior will be a bit different--when one of the Observables completes, the .zip() operator unsubscribes from the other one automatically.

Consider that we were to augment the preceding example with some logging, as shown:

Observable.zip(
Observable.just("One", "Two", "Three")
.doOnDispose(() -> log("just", "doOnDispose"))
.doOnTerminate(() -> log("just", "doOnTerminate")),
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(() -> log("interval", "doOnDispose"))
.doOnTerminate(() -> log("interval",
"doOnTerminate")),
(number, interval) -> number + "-" + interval)
.doOnDispose(() -> log("zip", "doOnDispose"))
.doOnTerminate(() -> log("zip", "doOnTerminate"))
.subscribe(e -> log(e));

We will then see that the output produced will be this:

APP: just:main:doOnTerminate
APP: One-0:RxComputationThreadPool-1
APP: Two-1:RxComputationThreadPool-1
APP: Three-2:RxComputationThreadPool-1
APP: just:RxComputationThreadPool-4:doOnDispose
APP: interval:RxComputationThreadPool-1:doOnDispose
APP: zip:RxComputationThreadPool-1:doOnTerminate
APP: zip:RxComputationThreadPool-1:doOnDispose

It can be seen that the .just() Observable is terminated quickly as its emissions are basically instant. Next, the following function is executed three times; this concatenates the items that were emitted from the Observables into a single value:

(number, interval) -> number + "-" + interval

The value is then logged into the terminal. Finally, we can see that both the Observables are unsubscribed (disposed), and the initial .zip() Observable is terminated.

It is worth noting that the .interval() Observable was never terminated because it never produced a onError() or onComplete() action--it was just constantly emitting values with onNext().

Also, there are multiple overloaded versions of .zip()--it can consume multiple Observables up to nine (or more if .zipArray() is used).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.246.223