Chapter 5. Transforming Observables

In the previous chapter, we explored the filtering universe of RxJava. We learned how to filter the values we don't need with filter(), how to get subsets of the emitted values with take(), and how to get rid of duplicates with distinct(). We learned how to exploit time with timeout(), sample(), and debounce().

In this chapter, we will learn how to transform Observable sequences to create sequences that better fit our needs.

The *map family

RxJava provides a few mapping functions: map(), flatMap(), concatMap(), flatMapIterable(), and switchMap(). All these functions apply to an Observable sequence, transform its emitted values, and return them in a new form. Let's look at them one by one with proper real-world examples.

Map

RxJava's map() function receives a specific Func object and applies it to every value emitted by Observable. The next figure shows how to apply a multiplying function to every emitted item, to create a new Observable, emitting transformed items:

Map

Let's think about our installed applications' list. How could we show the same list, but with all the names in lowercase?

Our loadList() function will change to this:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);

    Observable.from(apps)
            .map(new Func1<AppInfo, AppInfo>() {
                @Override
                public AppInfo call(AppInfo appInfo) {
                    String currentName = appInfo.getName();
                    String lowerCaseName = currentName.toLowerCase();
                    appInfo.setName(lowerCaseName);
                    return appInfo;
                }
            })
            .subscribe(new Observer<AppInfo>() {
                @Override
                public void onCompleted() {
                    mSwipeRefreshLayout.setRefreshing(false);
                }

                @Override
                public void onError(Throwable e) {
                    Toast.makeText(getActivity(), "Something went  south!", Toast.LENGTH_SHORT).show();
                    mSwipeRefreshLayout.setRefreshing(false);
                }

                @Override
                public void onNext(AppInfo appInfo) {
                    mAddedApps.add(appInfo);
                    mAdapter.addApplication(mAddedApps.size() - 1,  appInfo);
                }
            });
}

As you can see, after creating the emitting Observable as usual, we appended a map() call. We created a simple function that updates the AppInfo object and provides a new version with the lowercase name to Observer.

FlatMap

In a complex scenario, we could have an Observable that emits a sequence on values, which emit Observables. RxJava's flatMap() function provides a way to flatten the sequence, merging all the emitted value into one final Observable.

FlatMap

When working with a potentially large number of Observables, it is important to keep in mind that in case of error in any of the Observables, flatMap() itself will trigger its onError() function and abort the whole chain.

An important note is about the merging part: it allows interleaving. This means that flatMap() is not able to maintain the exact emitting order of the source Observables in the final Observable, as shown in the previous figure.

ConcatMap

RxJava's concatMap()function solves flatMap() interleaving issue, providing a flattening function that is able to concatenate the emitted values, instead of merging them, as shown in the following figure:

ConcatMap

FlatMapIterable

As a member of the *map family, flatMapIterable() works similarly to flatMap(). The only concrete difference is that it pairs up source items and generated Iterables, rather than source items and generated Observables:

FlatMapIterable

SwitchMap

As shown in the following figure, switchMap() acts similarly to flatMap(), but it unsubscribes from and stops mirroring the Observable that was generated from the previously emitted item and begins mirroring only the current one whenever a new item is emitted by the source Observable.

SwitchMap

Scan

RxJava's scan() function can be considered as an accumulator function. The scan() function applies a function to every item emitted by the Observable, computes the function result, and injects the result back into the Observable sequence, waiting to use it with the next emitted value:

Scan

As a generic example, here is an accumulator:

Observable.just(1, 2, 3, 4, 5)
        .scan((sum, item) -> sum + item)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                Log.d("RXJAVA", "item is: " + item);
            }

            @Override
            public void onError(Throwable error) {
                Log.e("RXJAVA", "Something went south!");
            }

            @Override
            public void onCompleted() {
                Log.d("RXJAVA", "Sequence completed.");
            }
        });

As a result, we have:

RXJAVA: item is: 1
RXJAVA: item is: 3
RXJAVA: item is: 6
RXJAVA: item is: 10
RXJAVA: item is: 15
RXJAVA: Sequence completed.

We can even create a new version of our loadList() function that compares every installed app name and creates a list of incrementally longer names:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);

    Observable.from(apps)
            .scan((appInfo, appInfo2) -> {
                if (appInfo.getName().length() >  appInfo2.getName().length()) {
                    return appInfo;
                } else {
                    return appInfo2;
                }
            })
            .distinct()
            .subscribe(new Observer<AppInfo>() {
                @Override
                public void onCompleted() {
                    mSwipeRefreshLayout.setRefreshing(false);
                }

                @Override
                public void onError(Throwable e) {
                    Toast.makeText(getActivity(), "Something went  south!", Toast.LENGTH_SHORT).show();
                    mSwipeRefreshLayout.setRefreshing(false);
                }

                @Override
                public void onNext(AppInfo appInfo) {
                    mAddedApps.add(appInfo);
                    mAdapter.addApplication(mAddedApps.size() - 1,  appInfo);
                }
            });
}

As a result, we have this:

Scan

There is also a second variant of scan() that takes an initial value to be used with the first emitted value. The method signature looks like this: scan(R, Func2). As a graphical example, it acts like this:

Scan

GroupBy

From the outset of the first example, with our installed apps list, the list has been sorted alphabetically. However, what if we want to sort the apps by their last update date? RxJava provides a useful function to group elements from a list according to a specific criteria: groupBy(). As a graphical example, the next figure shows how groupBy() can group emitted values by their shape:

GroupBy

This function transforms the source Observable into a new Observable, which emits Observables. Each one of these new Observable emits the items of a specific group.

To create a grouped list of out installed apps, we introduce a new element in our loadList() function:

Observable<GroupedObservable<String, AppInfo>> groupedItems = Observable.from(apps)
        .groupBy(new Func1<AppInfo, String>() {
            @Override
            public String call(AppInfo appInfo) {
                SimpleDateFormat formatter = new SimpleDateFormat("MM/yyyy");
                return formatter.format(new  Date(appInfo.getLastUpdateTime()));
            }
        });

Now we have a new Observable, groupedItems, which will emit a sequence of GroupedObservable items. GrouperdObservable is a particular Observable that comes with a grouping key. In our example, the key is String, representing the last update date in the Month/Year format.

At this point, we have a few Observable emitting AppInfo items with which we need to populate our list. We want to preserve the alphabetical sorting order and the grouping order as well. We will create a new Observable that will concatenate all the others, and we will subscribe to it as usual:

Observable
        .concat(groupedItems)
        .subscribe(new Observer<AppInfo>() {
            @Override
            public void onCompleted() {
                mSwipeRefreshLayout.setRefreshing(false);
            }

            @Override
            public void onError(Throwable e) {
                Toast.makeText(getActivity(), "Something went  south!", Toast.LENGTH_SHORT).show();
                mSwipeRefreshLayout.setRefreshing(false);
            }

            @Override
            public void onNext(AppInfo appInfo) {
                mAddedApps.add(appInfo);
                mAdapter.addApplication(mAddedApps.size() - 1,  appInfo);
            }
        });

Our loadList() function is complete and, as result, we have:

GroupBy

Buffer

RxJava's buffer()function transforms the source Observable into a new Observable, which emits values as a list instead of a single item:

Buffer

The previous figure shows how buffer() takes an integer count as a parameter to specify how many items should be included in the emitted list. Indeed, there are a few variations of the buffer() function. One of these lets you specify a skip value: every skip value fills the buffer with count elements, as shown in the next figure:

Buffer

Playing with time, buffer() can even take a timespan value and create an Observable that emits a list of every elapsed timespan:

Buffer

Window

RxJava's window()function is similar to buffer() but it emits Observables instead of lists. The next figure shows how window() will buffer three items and emit them as a new Observable:

Window

Each Observable emits a subset of the items from the source Observable according to count, and then it terminates with a standard onCompleted() function. As for buffer(), window() has a skip variation, as shown in the next figure:

Window

Cast

RxJava's cast()function is the final operator of this chapter. It is a specialized version of the map() operator. It transforms each item from the source Observable into new type, casting it to a different Class:

Cast
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.124.145