Chapter 6. Combining Observables

In the previous chapter, we learned how to transform Observable sequences. We saw a practical example of map(), scan(), groupBy(), and a few more useful functions that will help us manipulate an Observable to create the final Observable we want.

In this chapter, we are going to dig into the combining functions and will learn how to work with multiple Observables at the same time to create the Observable that we want.

Merge

Living in an asynchronous world often creates scenarios in which we have multiple sources but we want to provide only one fruition point: multiple input, single output. RxJava merge() helps you to combine two or more Observables, merging their emitted items. The following figure gives you an example of merging two sequences in one final emitting Observable:

Merge

As you can see, the emitted items are perfectly merged and interleaved in one final Observable. Note that if you synchronously merge Observables, they are concatenated and not interleaved.

As usual, we are going to create a real-world example using our app and our well-known installed apps list. We are going to need a second Observable for this. We can create a separate installed apps list and we can reverse it. Again, there is no real practical meaning in this, but it's just for the sake of the example. With the second list, our loadList() function will be like this:

private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);

    List reversedApps = Lists.reverse(apps);

    Observable<AppInfo> observableApps = Observable.from(apps);
    Observable<AppInfo> observableReversedApps = Observable.from(reversedApps);

    Observable<AppInfo> mergedObserbable = Observable.merge(observableApps, observableReversedApps);

mergedObserbable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!",  Toast.LENGTH_LONG).show();
        }

@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "One of the two Observable threw an error!",  Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
        }

@Override
public void onNext(AppInfoappInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        }
    });
}

We have our usual Observable and observableApps items and our new observableReversedApps reversed list. Using Observable.merge(), we can create new Observable mergedObservable that will emit all the items a emitted by our source Observables in one single Observable sequence.

As you can see, every method signature stays the same, so our Observer won't notice any difference and we can reuse the code. As result, we have this:

Merge

Paying attention to the Toast message in the onError() message, you can guess that every Observable that throws an error will break the merging. If you need to avoid this scenario, RxJava provides mergeDelayError(), which will continue to emit the item from an Observable even if the other one threw an error. When all the still-working Observables have finished, mergeDelayError() will emit onError(), as shown in the following figure:

Merge
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.237.123