flatMap()

One of the most powerful and critical operators in RxJava is flatMap(). If you have to invest time in understanding any RxJava operator, this is the one. It is an operator that performs a dynamic Observable.merge() by taking each emission and mapping it to an Observable. Then, it merges the emissions from the resulting Observables into a single stream.

The simplest application of flatMap() is to map one emission to many emissions. Say, we want to emit the characters from each string coming from Observable<String>. We can use flatMap() to specify a Function<T,Observable<R>> lambda that maps each string to an Observable<String>which will emit the letters. Note that the mapped Observable<R> can emit any type R, different from the source T emissions.  In this example, it just happened to be String, like the source:

import io.reactivex.Observable;

public class Launcher {
public static void main(String[] args) {

Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");

source.flatMap(s -> Observable.fromArray(s.split("")))
.subscribe(System.out::println);
}
}

The output of the preceding code is as follows:

    A
l
p
h
a
B
e
t
a
G
a
m
m
...

We have taken those five string emissions and mapped them (through flatMap()) to emit the letters from each one. We did this by calling each string's split() method, and we passed it an empty String argument "", which will separate on every character. This returns an array String[] containing all the characters, which we pass to Observable.fromArray() to emit each one. The flatMap() expects each emission to yield an Observable, and it will merge all the resulting Observables and emit their values in a single stream.

Here is another example: let's take a sequence of String values (each a concatenated series of values separated by "/"), use  flatMap() on them, and filter for only numeric values before converting them into Integer emissions:

import io.reactivex.Observable;

public class Launcher {
public static void main(String[] args) {

Observable<String> source =
Observable.just("521934/2342/FOXTROT", "21962/12112/78886
/TANGO"
,
"283242/4542/WHISKEY/2348562");

source.flatMap(s -> Observable.fromArray(s.split("/")))
.filter(s -> s.matches("[0-9]+")) //use regex to filter
integers
.map(Integer::valueOf)
.subscribe(System.out::println);
}
}

The output of the preceding code is as follows:

    521934
2342
21962
12112
78886
283242
4542
2348562

We broke up each String  by the / character, which yielded an array. We turned that into an Observable and used flatMap() on it to emit each String. We filtered only for String values that are numeric using a regular expression [0-9]+ (eliminating FOXTROT, TANGO, and WHISKEY) and then turned each emission into an Integer.

Just like Observable.merge(), you can also map emissions to infinite Observables and merge them. For instance, we can emit simple Integer values from Observable<Integer> but use flatMap() on them to drive an Observable.interval(), where each one serves as the period argument. In the following code snippet, we emit the values 2, 3, 10, and 7, which will yield interval Observables that emit at 2 seconds, 3 seconds, 10 seconds, and 7 seconds, respectively. These four Observables will be merged into a single stream:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
public static void main(String[] args) {

Observable<Integer> intervalArguments =
Observable.just(2, 3, 10, 7);

intervalArguments.flatMap(i ->
Observable.interval(i, TimeUnit.SECONDS)
.map(i2 -> i + "s interval: " + ((i + 1) * i) + " seconds
elapsed"
)
).subscribe(System.out::println);

sleep(12000);
}
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

The output of the preceding code is as follows:

    2s interval: 2 seconds elapsed
3s interval: 3 seconds elapsed
2s interval: 4 seconds elapsed
2s interval: 6 seconds elapsed
3s interval: 6 seconds elapsed
7s interval: 7 seconds elapsed
2s interval: 8 seconds elapsed
3s interval: 9 seconds elapsed
2s interval: 10 seconds elapsed
10s interval: 10 seconds elapsed
2s interval: 12 seconds elapsed
3s interval: 12 seconds elapsed

The Observable.merge() operator will accept a fixed number of Observable sources. But flatMap() will dynamically keep adding new Observable sources for each emission that comes in. This means that you can keep merging new incoming Observables over time.

Another quick note about flatMap() is it can be used in many clever ways. To this day, I keep finding new ways to use it. But another way you can get creative is to evaluate each emission within flatMap() and figure out what kind of Observable you want to return. For example, if my previous example emitted an emission of 0 to flatMap(), this will break the resulting Observable.interval(). But I can use an if statement to check whether it is 0 and return Observable.empty() instead, as used in the following code snippet:

Observable<Integer> secondIntervals =
Observable.just(2, 0, 3, 10, 7);

secondIntervals.flatMap(i -> {
if (i == 0)
return Observable.empty();
else
return Observable.interval(i, TimeUnit.SECONDS)
.map(l -> i + "s interval: " + ((l + 1) * i) + " seconds
elapsed"
);
}).subscribe(System.out::println);

Of course, this is probably too clever as you can just put filter() before flatMap() and filter out emissions that are equal to 0. But the point is that you can evaluate an emission in flatMap() and determine what kind of Observable you want to return.

The flatMap() is also a great way to take a hot Observable UI event stream (such as JavaFX or Android button clicks) and flatMap() each of those events to an entire process within flatMap(). The failure and error recovery can be handled entirely within that flatMap(), so each instance of the process does not disrupt future button clicks.

If you do not want rapid button clicks to produce several redundant instances of a process, you can disable the button using doOnNext() or leverage switchMap() to kill previous processes, which we will discuss in Chapter 7, Switching, Throttling, Windowing, and Buffering.

Note that there are many flavors and variants of flatMap(), accepting a number of overloads that we will not get into deeply for the sake of brevity. We can pass a second combiner argument, which is a BiFunction<T,U,R> lambda, to associate the originally emitted T value with each flat-mapped U value and turn both into an R value. In our earlier example of emitting letters from each string, we can associate each letter with the original string emission it was mapped from:

import io.reactivex.Observable;

public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");

source.flatMap(s -> Observable.fromArray(s.split("")), (s,r) ->
s + "-" + r)
.subscribe(System.out::println);
}
}

The output of the preceding code is as follows:

    Alpha-A
Alpha-l
Alpha-p
Alpha-h
Alpha-a
Beta-B
Beta-e
Beta-t
Beta-a
Gamma-G
...

We can also use flatMapIterable() to map each T emission into an Iterable<R> instead of an Observable<R>. It will then emit all the R values for each Iterable<R>, saving us the step and overhead of converting it into an Observable. There are also flatMap() variants that map to Singles (flatMapSingle()), Maybes (flatMapMaybe()), and Completables (flatMapCompletable()). A lot of these overloads also apply to concatMap(), which we will cover next.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.108.68