Now that we have the means for creating Observable
instances from a wide variety of source data, it's time to build programming logic around these instances. We will present the basic reactive operators that we'll use to achieve step-by-step computations (the reactive way of handling data).
We will begin with transformations, using the famous flatMap()
and map()
operators, as well as some more less common transforming operators. After that we'll learn how to filter our data, using the filter()
operator, skipping elements, receiving only elements at a given position in time. The chapter will also cover accumulating data with the scan
operator. Most of these operators will be presented using marble diagrams.
This chapter covers the following topics:
scan
operatorWe've used the map()
operator in some of the previous examples. The higher order functions which transform the incoming values into something else are called transformations. The higher order functions that can be invoked on an Observable
instance, producing a new Observable
instance from it, are called operators. The transforming operators transform the elements emitted from the source Observable
instance in some way.
In order to understand how the different operators work, we will be using pictures called marble diagrams. For example, this one describes the map
operator:
The rectangle in the center of the diagram represents the operator (function). It transforms its input (circles) into something else (triangles). The arrow above the rectangle represents the source Observable
instance, the colored circles on it represent OnNext
notifications emitted in time, and the vertical line at the end is the OnCompleted
notification. The arrow below the rectangle is the output of the Observable
instance with its transformed elements.
So, the map()
operator does exactly this: it transforms every 'next' value from the source to something else, defined via the function passed to it. Here is a little example:
Observable<String> mapped = Observable .just(2, 3, 5, 8) .map(v -> v * 3) .map(v -> (v % 2 == 0) ? "even" : "odd"); subscribePrint(mapped, "map");
The first map()
operator transforms every number emitted from the source to itself, multiplied by three. The second map()
operator transforms every multiplied number to a string. The string is 'even
' if the number is even and 'odd
' otherwise.
Using the map()
operator, we can transform each emitted value into a new value. There are more powerful transforming operators that look similar to the map()
operator, but have their own usage and purpose. Let's look at them.
The flatMap
operator is just like the map()
operator, but with two differences:
flatMap
operator's argument always transforms a value or sequence of values into the form of an Observable
instance.Observable
instances. This means that instead of emitting the Observable
instances as values it emits their notifications.Here is the marble diagram for it:
As we can see, each value from the source Observable
instance is turned into an Observable
instance, and in the end, all the values of these derivative Observables are emitted by the resulting Observable
instance. Note that the resulting Observable
instance may emit the values of the derivative Observable
instances in an interleaved fashion and even out of order.
The flatMap
operator is very useful for forking logic. For example, if an Observable
instance represents a file system folder and emits files from it, we can turn each file object into an Observable
instance using the flatMap
operator and apply some operations to these file observables. The result will be a summary of these operations. Here is an example of reading some files from a folder and dumping them into the standard output:
Observable<Path> listFolder(Path dir, String glob) { // (1)
return Observable.<Path>create(subscriber -> {
try {
DirectoryStream<Path> stream = Files.newDirectoryStream(dir, glob);
subscriber.add(Subscriptions.create(() -> {
try {
stream.close();
}
catch (IOException e) {
e.printStackTrace();
}
}));
Observable.<Path>from(stream).subscribe(subscriber);
}
catch (DirectoryIteratorException ex) {
subscriber.onError(ex);
}
catch (IOException ioe) {
subscriber.onError(ioe);
}
});
}
Observable<String> from(final Path path) { // (2)
return Observable.<String>create(subscriber -> {
try {
BufferedReader reader = Files.newBufferedReader(path);
subscriber.add(Subscriptions.create(() -> {
try {
reader.close();
}
catch (IOException e) {
e.printStackTrace();
}
}));
String line = null;
while ((line = reader.readLine()) != null && !subscriber.isUnsubscribed()) {
subscriber.onNext(line);
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
catch (IOException ioe) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(ioe);
}
}
});
}
Observable<String> fsObs = listFolder(
Paths.get("src", "main", "resources"), "{lorem.txt,letters.txt}"
).flatMap(path -> from(path)); // (3)
subscribePrint(fsObs, "FS"); // (4)
This piece of code introduces two methods for working with folders and files. We will take a short look at them and how we've used them in this flatMap
example:
listFolder()
, takes a folder in the form of a Path
variable and a glob
expression. It returns an Observable
instance representing this folder. This Observable
instance emits all the files in the folder, complying the glob
expression as Path
objects.The method is implemented using both the Observable.create()
and Observable.from()
operators. The main idea of this implementation is that if an exception occurs, it should be handled and emitted by the resulting Observable
instance.
Note the use of the Subscriber.add()
operator to add a new Subscription
instance to the subscriber, created using the Subscriptions.create()
operator. This method creates a Subscription
instance using an action. This action will be executed when the Subscription
instance is unsubscribed, which means when the Subscriber
instance is unsubscribed in this case. So this is similar to putting the closing of the stream
in the final block.
Observable<String> from(Path)
.It reads a file located and passed to the path
instance line by line and emits the lines as OnNext()
notifications. The method uses the Subscriber.add()
operator on a Subscription
instance for closing the stream
to the file.
flatMap
creates an Observable
instance from a folder, using the listFolder()
operator, which emits two Path
parameters to files. Using the flatMap()
operator for every file, we create an Observable
instance, using the from(Path)
operator, which emits the file content as lines.Scheduler
instances (see Chapter 6, Using Concurrency and Parallelism with Schedulers) for every file path Observable, the content would be scrambled because the flatMap
operator interleaves the notifications of the Observable
instances that it merges.The source code introducing the Observable<String> from(final Path path)
method can be found at https://github.com/meddle0x53/learning-rxjava/blob/724eadf5b0db988b185f8d86006d772286037625/src/main/java/com/packtpub/reactive/common/CreateObservable.java#L61.
The source code containing the Observable<Path> listFolder(Path dir, String glob)
method can be viewed/downloaded at https://github.com/meddle0x53/learning-rxjava/blob/724eadf5b0db988b185f8d86006d772286037625/src/main/java/com/packtpub/reactive/common/CreateObservable.java#L128.
The example using the flatMap
operator can be viewed/downloaded at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/FlatMapAndFiles.java.
The flatMap
operator has multiple overloads. For example, there is one that takes three functions—one for OnNext
, one for OnError
, and one for OnComleted
. It transforms errors or completed events into Observable
instances too, and if there is an OnError
or OnCompleted
event, their Observable
instance transformation is merged in the resulting Observable
instance, followed by an OnCompleted
notification. Here is an example:
Observable<Integer> flatMapped = Observable .just(-1, 0, 1) .map(v -> 2 / v) .flatMap( v -> Observable.just(v), e -> Observable.just(0), () -> Observable.just(42) ); subscribePrint(flatMapped, "flatMap");
The output of that will be -2(2/-1)
and 0
(because of the error raised by 2/0
). Because of the error, 1
won't be emitted and won't reach the flatMap
operator.
Another interesting overload is Observable<R> flatMap(Func1<T, Observable<U>>, Func2<T, U, R>)
. Here is its marble diagram:
This one combines items from the source Observable
instance with the Observable
instance triggered by those source items and calls a user-supplied function with the pair of the original and derived items. The Observable
instance will then emit the result of this function. Here is an example:
Observable<Integer> flatMapped = Observable .just(5, 432) .flatMap( v -> Observable.range(v, 2), (x, y) -> x + y); subscribePrint(flatMapped, "flatMap");
The output is:
flatMap : 10 flatMap : 11 flatMap : 864 flatMap : 865 flatMap ended!
This is because the first element emitted by the source Observable
instance is 5
, the flatMap
operator turns it into an Observable
instance using the range()
operator, which emits 5
and 6
. But this flatMap
operator doesn't stop here; for every item emitted by this range Observable
instance, it applies the second function with first parameter—the original item (5
) and second parameter—the range-emitted item. So we have 5 + 5 and then 5 + 6. The same is applied for the second item emitted by the source Observable
instance: 432
. It is turned to 432 + 432 = 864 and 432 + 433 = 865.
This overload is useful when all of the derivative items need to have access to their source item and usually saves us from using some kind of tuple or pair classes, saving on memory and library dependencies. In the earlier example with files, we could prepend the name of the file to each of the outputted lines:
CreateObservable.listFolder( Paths.get("src", "main", "resources"), "{lorem.txt,letters.txt}" ).flatMap( path -> CreateObservable.from(path), (path, line) -> path.getFileName() + " : " + line );
The operator flatMapIterable
doesn't take as parameter lambda that takes arbitrary value as a parameter and returns an Observable
instance. Instead the lambda passed to it takes arbitrary value and returns an Iterable
instance. All of these Iterable
instances are flattened to values emitted by the resulting Observable
instance. Let's take a look at the following code snippet:
Observable<?> fIterableMapped = Observable
.just(
Arrays.asList(2, 4),
Arrays.asList("two", "four"),
)
.flatMapIterable(l -> l);
This simple example merges the two lists emitted by the source Observable
instance, and the result emits the four items. It is worth mentioning that invoking flatMapIterable(list -> list)
is the same as invoking flatMap(l → Observable.from(l))
.
Another form of the flatMap
operator is the concatMap
operator. It behaves just like the original flatMap
operator, except that it concatenates rather than merges the resulting Observable
instance in order to generate its own sequence. The following marble diagram shows how it works:
The items from the different derivative Observables are not interleaved, as with the flatMap
operator. A significant difference between the flatMap
and concatMap
operators is that the flatMap
operator uses the inner Observable
instances in parallel, whereas the concatMap
operator only subscribes to one of the Observable
instances at a time.
The last operator similar to flatMap
is switchMap
. Its marble diagram looks like this:
It operates in similar fashion to the flatMap
operator, except that whenever a new item is emitted by the source Observable
instance, it stops mirroring the Observable
instance generated from the previously emitted item and it begins mirroring only the current Observable
instance. In other words, it internally unsubscribes from the current derivative Observable
instance when the next one begins emitting its items. Here is an example of this:
Observable<Object> obs = Observable .interval(40L, TimeUnit.MILLISECONDS) .switchMap(v -> Observable .timer(0L, 10L, TimeUnit.MILLISECONDS) .map(u -> "Observable <" + (v + 1) + "> : " + (v + u))) ); subscribePrint(obs, "switchMap");
The source Observable
instance is using the Observable.interval()
operator to emit sequential numbers (beginning with zero) every 40 milliseconds. Using the switchMap
operator, a new Observable
instance emitting another sequence of numbers is created for every number. This secondary sequence of numbers begins from the source number that was passed to the switchMap
operator (that's implemented by summing the source number with every emitted number, using the map()
operator). So, every 40 milliseconds, a new sequence of numbers is being emitted (each number at 10-millisecond intervals).
The resulting output looks like:
switchMap : Observable <1> : 0 switchMap : Observable <1> : 1 switchMap : Observable <1> : 2 switchMap : Observable <1> : 3 switchMap : Observable <2> : 1 switchMap : Observable <2> : 2 switchMap : Observable <2> : 3 switchMap : Observable <2> : 4 switchMap : Observable <3> : 2 switchMap : Observable <3> : 3 switchMap : Observable <3> : 4 switchMap : Observable <3> : 5 switchMap : Observable <3> : 6 switchMap : Observable <4> : 3 .................
The source code for all the mapping examples can be downloaded/viewed at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/MappingExamples.java.
Items can be grouped by specific property or key.
First, we'll look at the groupBy()
operator, a method that divides a source Observable
instance into multiple Observable
instances. Each of these Observable
instances emits some of the source's items depending on a grouping function.
The groupBy()
operator returns an Observable
instance that emits Observable
instances. These Observable
instances are special; they are of type GroupedObservable
, and you can retrieve their grouping keys using the getKey()
method. Once the groupBy()
operator is used, the different groups can be handled in a different or a common way.
Note that when the groupBy()
operator creates an observable that emits the GroupedObservables
instances, each of them buffers its items. So, if we ignore any of them, this buffer will present a potential memory leak.
The marble diagram of the groupBy()
operator looks like this:
Here, the form of the items is used as the common trait of the grouping. For a better understanding of the idea of the method, we can look at this example:
List<String> albums = Arrays.asList(
"The Piper at the Gates of Dawn",
"A Saucerful of Secrets",
"More", "Ummagumma", "Atom Heart Mother",
"Meddle", "Obscured by Clouds",
"The Dark Side of the Moon",
"Wish You Were Here", "Animals", "The Wall"
);
Observable
.from(albums)
.groupBy(album -> album.split(" ").length)
.subscribe(obs ->
subscribePrint(obs, obs.getKey() + " word(s)")
);
The example emits some of Pink Floyd's album titles and groups them by the number of words contained in them. For example Meddle
and More
are in the same group with key 1
, and A Saucerful of Secrets
and Wish You Were Here
are both in a group with the key of 4
. All these groups are presented by the GroupedObservable
instances, so we can subscribe to them in the subscribe()
call of the source Observable
instance. The different groups are printed with different labels, depending on their keys. The output of this little program is as follows:
7 word(s) : The Piper at the Gates of Dawn 4 word(s) : A Saucerful of Secrets 1 word(s) : More 1 word(s) : Ummagumma 3 word(s) : Atom Heart Mother 1 word(s) : Meddle 3 word(s) : Obscured by Clouds 6 word(s) : The Dark Side of the Moon 4 word(s) : Wish You Were Here 1 word(s) : Animals 2 word(s) : The Wall
The order the items are emitted in is the same, but they are emitted by different GroupedObservable
instances. Also, all the GroupedObservable
instances are completed after the source completes.
The groupBy()
operator has another overload that takes a second, transforming function that transforms each of the items in a group in some way. Here is an example:
Observable .from(albums) .groupBy( album -> album.replaceAll("[^mM]", "").length(), album -> album.replaceAll("[mM]", "*") ) .subscribe( obs -> subscribePrint(obs, obs.getKey()+" occurences of 'm'") );
The album titles are grouped by the number of the occurrences of the letter m
in them. The text is transformed in a way that all the occurrences of the letter are replaced with *
. The output is as follows:
0 occurences of 'm' : The Piper at the Gates of Dawn 0 occurences of 'm' : A Saucerful of Secrets 1 occurences of 'm' : *ore 4 occurences of 'm' : U**agu**a 2 occurences of 'm' : Ato* Heart *other 1 occurences of 'm' : *eddle 0 occurences of 'm' : Obscured by Clouds 1 occurences of 'm' : The Dark Side of the *oon 0 occurences of 'm' : Wish You Were Here 1 occurences of 'm' : Ani*als 0 occurences of 'm' : The Wall
The source code demonstrating use of the Observable.groupBy()
operator can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/UsingGroupBy.java.
There are a few additional transformations worth mentioning. For example, there is the cast()
operator, which is a shortcut for the map(v -> someClass.cast(v))
.
List<Number> list = Arrays.asList(1, 2, 3);
Observable<Integer> iObs = Observable
.from(list)
.cast(Integer.class);
The initial Observable
instance here emits values of type Number
, but they are actually Integer
instances, so we can use the cast()
operator to represent them as Integer
instances.
Another helpful operator is the timestamp()
operator. It adds a timestamp to each emitted value by transforming it into an instance of the Timestamped<T>
class. This is helpful if, for example, we want to log the output of an Observable
, as follows:
List<Number> list = Arrays.asList(3, 2);
Observable<Timestamped<Number>> timestamp = Observable
.from(list)
.timestamp();
subscribePrint(timestamp, "Timestamps");
In this example, each number is being timestamped. Again, that can be implemented using the map()
operator very easily. The output of the preceding example looks like this:
Timestamps : Timestamped(timestampMillis = 1431184924388, value = 1) Timestamps : Timestamped(timestampMillis = 1431184924394, value = 2) Timestamps : Timestamped(timestampMillis = 1431184924394, value = 3)
A similar operator is the timeInterval
operator, but it transforms a value to an instance of the TimeInterval<T>
operator instead. A TimeInterval<T>
instance represents an item emitted by an Observable
along with the amount of time that elapsed either since the emission of the previous item, or (if there was no previous item) since the subscription. This can be used for generating statistics, for example:
Observable<TimeInterval<Long>> timeInterval = Observable
.timer(0L, 150L, TimeUnit.MILLISECONDS)
.timeInterval();
subscribePrint(timeInterval, "Time intervals");
This will output something similar to this:
Time intervals : TimeInterval [intervalInMilliseconds=13, value=0] Time intervals : TimeInterval [intervalInMilliseconds=142, value=1] Time intervals : TimeInterval [intervalInMilliseconds=149, value=2] ...................................................................
We can see that the different values are emitted roughly at 150 milliseconds, as they should be.
Both the timeInterval
and timestamp
operators work on the immediate scheduler (see Chapter 6, Using Concurrency and Parallelism with Schedulers), and both of them keep their time information in milliseconds.
The source code for the preceding examples can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/VariousTransformationsDemonstration.java.
18.219.4.174