Chapter 4. Transforming, Filtering, and Accumulating Your Data

Now that we have the means for creating Observable instances from a wide variety of source data, it's time to build programming logic around these instances. We will present the basic reactive operators that we'll use to achieve step-by-step computations (the reactive way of handling data).

We will begin with transformations, using the famous flatMap() and map() operators, as well as some more less common transforming operators. After that we'll learn how to filter our data, using the filter() operator, skipping elements, receiving only elements at a given position in time. The chapter will also cover accumulating data with the scan operator. Most of these operators will be presented using marble diagrams.

This chapter covers the following topics:

  • Introduction to marble diagrams and transformations with mapping
  • Filtering your data
  • Accumulating values using the scan operator

Observable transformations

We've used the map() operator in some of the previous examples. The higher order functions which transform the incoming values into something else are called transformations. The higher order functions that can be invoked on an Observable instance, producing a new Observable instance from it, are called operators. The transforming operators transform the elements emitted from the source Observable instance in some way.

In order to understand how the different operators work, we will be using pictures called marble diagrams. For example, this one describes the map operator:

Observable transformations

The rectangle in the center of the diagram represents the operator (function). It transforms its input (circles) into something else (triangles). The arrow above the rectangle represents the source Observable instance, the colored circles on it represent OnNext notifications emitted in time, and the vertical line at the end is the OnCompleted notification. The arrow below the rectangle is the output of the Observable instance with its transformed elements.

So, the map() operator does exactly this: it transforms every 'next' value from the source to something else, defined via the function passed to it. Here is a little example:

Observable<String> mapped = Observable
  .just(2, 3, 5, 8)
  .map(v -> v * 3)
  .map(v -> (v % 2 == 0) ? "even" : "odd");
subscribePrint(mapped, "map");

The first map() operator transforms every number emitted from the source to itself, multiplied by three. The second map() operator transforms every multiplied number to a string. The string is 'even' if the number is even and 'odd' otherwise.

Using the map() operator, we can transform each emitted value into a new value. There are more powerful transforming operators that look similar to the map() operator, but have their own usage and purpose. Let's look at them.

Transformations with the various flatMap operators

The flatMap operator is just like the map() operator, but with two differences:

  • Instead of receiving a function that transforms a value into an arbitrary type of value, the flatMap operator's argument always transforms a value or sequence of values into the form of an Observable instance.
  • It merges the values emitted by those resulting Observable instances. This means that instead of emitting the Observable instances as values it emits their notifications.

Here is the marble diagram for it:

Transformations with the various flatMap operators

As we can see, each value from the source Observable instance is turned into an Observable instance, and in the end, all the values of these derivative Observables are emitted by the resulting Observable instance. Note that the resulting Observable instance may emit the values of the derivative Observable instances in an interleaved fashion and even out of order.

The flatMap operator is very useful for forking logic. For example, if an Observable instance represents a file system folder and emits files from it, we can turn each file object into an Observable instance using the flatMap operator and apply some operations to these file observables. The result will be a summary of these operations. Here is an example of reading some files from a folder and dumping them into the standard output:

Observable<Path> listFolder(Path dir, String glob) { // (1)
  return Observable.<Path>create(subscriber -> {
    try {
      DirectoryStream<Path> stream = Files.newDirectoryStream(dir, glob);
      subscriber.add(Subscriptions.create(() -> {
        try {
          stream.close();
        }
        catch (IOException e) {
          e.printStackTrace();
        }
      }));
      Observable.<Path>from(stream).subscribe(subscriber);
    }
    catch (DirectoryIteratorException ex) {
      subscriber.onError(ex);
    }
    catch (IOException ioe) {
      subscriber.onError(ioe);
    }
  });
}
Observable<String> from(final Path path) { // (2)
  return Observable.<String>create(subscriber -> {
    try {
      BufferedReader reader = Files.newBufferedReader(path);
      subscriber.add(Subscriptions.create(() -> {
        try {
          reader.close();
        }
        catch (IOException e) {
          e.printStackTrace();
        }
      }));
      String line = null;
      while ((line = reader.readLine()) != null && !subscriber.isUnsubscribed()) {
        subscriber.onNext(line);
      }
      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();
      }
    }
    catch (IOException ioe) {
      if (!subscriber.isUnsubscribed()) {
        subscriber.onError(ioe);
      }
    }
  });
}
Observable<String> fsObs = listFolder(
  Paths.get("src", "main", "resources"), "{lorem.txt,letters.txt}"
).flatMap(path -> from(path)); // (3)
subscribePrint(fsObs, "FS"); // (4)

This piece of code introduces two methods for working with folders and files. We will take a short look at them and how we've used them in this flatMap example:

  1. The first method, listFolder(), takes a folder in the form of a Path variable and a glob expression. It returns an Observable instance representing this folder. This Observable instance emits all the files in the folder, complying the glob expression as Path objects.

    The method is implemented using both the Observable.create() and Observable.from() operators. The main idea of this implementation is that if an exception occurs, it should be handled and emitted by the resulting Observable instance.

    Note the use of the Subscriber.add() operator to add a new Subscription instance to the subscriber, created using the Subscriptions.create() operator. This method creates a Subscription instance using an action. This action will be executed when the Subscription instance is unsubscribed, which means when the Subscriber instance is unsubscribed in this case. So this is similar to putting the closing of the stream in the final block.

  2. The other method this example introduces is Observable<String> from(Path).

    It reads a file located and passed to the path instance line by line and emits the lines as OnNext() notifications. The method uses the Subscriber.add() operator on a Subscription instance for closing the stream to the file.

  3. The example using flatMap creates an Observable instance from a folder, using the listFolder() operator, which emits two Path parameters to files. Using the flatMap() operator for every file, we create an Observable instance, using the from(Path) operator, which emits the file content as lines.
  4. The result of the preceding chain will be the two file contents, printed on the standard output. If we used the Scheduler instances (see Chapter 6, Using Concurrency and Parallelism with Schedulers) for every file path Observable, the content would be scrambled because the flatMap operator interleaves the notifications of the Observable instances that it merges.

The flatMap operator has multiple overloads. For example, there is one that takes three functions—one for OnNext, one for OnError, and one for OnComleted. It transforms errors or completed events into Observable instances too, and if there is an OnError or OnCompleted event, their Observable instance transformation is merged in the resulting Observable instance, followed by an OnCompleted notification. Here is an example:

Observable<Integer> flatMapped = Observable
  .just(-1, 0, 1)
  .map(v -> 2 / v)
  .flatMap(
    v -> Observable.just(v),
    e -> Observable.just(0),
    () -> Observable.just(42)
  );
subscribePrint(flatMapped, "flatMap");

The output of that will be -2(2/-1) and 0 (because of the error raised by 2/0). Because of the error, 1 won't be emitted and won't reach the flatMap operator.

Another interesting overload is Observable<R> flatMap(Func1<T, Observable<U>>, Func2<T, U, R>). Here is its marble diagram:

Transformations with the various flatMap operators

This one combines items from the source Observable instance with the Observable instance triggered by those source items and calls a user-supplied function with the pair of the original and derived items. The Observable instance will then emit the result of this function. Here is an example:

Observable<Integer> flatMapped = Observable
.just(5, 432)
.flatMap(
  v -> Observable.range(v, 2),
  (x, y) -> x + y);
subscribePrint(flatMapped, "flatMap");

The output is:

flatMap : 10
flatMap : 11
flatMap : 864
flatMap : 865
flatMap ended!

This is because the first element emitted by the source Observable instance is 5, the flatMap operator turns it into an Observable instance using the range() operator, which emits 5 and 6. But this flatMap operator doesn't stop here; for every item emitted by this range Observable instance, it applies the second function with first parameter—the original item (5) and second parameter—the range-emitted item. So we have 5 + 5 and then 5 + 6. The same is applied for the second item emitted by the source Observable instance: 432. It is turned to 432 + 432 = 864 and 432 + 433 = 865.

This overload is useful when all of the derivative items need to have access to their source item and usually saves us from using some kind of tuple or pair classes, saving on memory and library dependencies. In the earlier example with files, we could prepend the name of the file to each of the outputted lines:

CreateObservable.listFolder(
  Paths.get("src", "main", "resources"),
  "{lorem.txt,letters.txt}"
).flatMap(
  path -> CreateObservable.from(path),
  (path, line) -> path.getFileName() + " : " + line
);

The operator flatMapIterable doesn't take as parameter lambda that takes arbitrary value as a parameter and returns an Observable instance. Instead the lambda passed to it takes arbitrary value and returns an Iterable instance. All of these Iterable instances are flattened to values emitted by the resulting Observable instance. Let's take a look at the following code snippet:

Observable<?> fIterableMapped = Observable
.just(
  Arrays.asList(2, 4),
  Arrays.asList("two", "four"),
)
.flatMapIterable(l -> l);

This simple example merges the two lists emitted by the source Observable instance, and the result emits the four items. It is worth mentioning that invoking flatMapIterable(list -> list) is the same as invoking flatMap(l → Observable.from(l)).

Another form of the flatMap operator is the concatMap operator. It behaves just like the original flatMap operator, except that it concatenates rather than merges the resulting Observable instance in order to generate its own sequence. The following marble diagram shows how it works:

Transformations with the various flatMap operators

The items from the different derivative Observables are not interleaved, as with the flatMap operator. A significant difference between the flatMap and concatMap operators is that the flatMap operator uses the inner Observable instances in parallel, whereas the concatMap operator only subscribes to one of the Observable instances at a time.

The last operator similar to flatMap is switchMap. Its marble diagram looks like this:

Transformations with the various flatMap operators

It operates in similar fashion to the flatMap operator, except that whenever a new item is emitted by the source Observable instance, it stops mirroring the Observable instance generated from the previously emitted item and it begins mirroring only the current Observable instance. In other words, it internally unsubscribes from the current derivative Observable instance when the next one begins emitting its items. Here is an example of this:

Observable<Object> obs = Observable
.interval(40L, TimeUnit.MILLISECONDS)
.switchMap(v ->
  Observable
  .timer(0L, 10L, TimeUnit.MILLISECONDS)
  .map(u -> "Observable <" + (v + 1) + "> : " + (v + u)))
);
subscribePrint(obs, "switchMap");

The source Observable instance is using the Observable.interval() operator to emit sequential numbers (beginning with zero) every 40 milliseconds. Using the switchMap operator, a new Observable instance emitting another sequence of numbers is created for every number. This secondary sequence of numbers begins from the source number that was passed to the switchMap operator (that's implemented by summing the source number with every emitted number, using the map() operator). So, every 40 milliseconds, a new sequence of numbers is being emitted (each number at 10-millisecond intervals).

The resulting output looks like:

switchMap : Observable <1> : 0
switchMap : Observable <1> : 1
switchMap : Observable <1> : 2
switchMap : Observable <1> : 3
switchMap : Observable <2> : 1
switchMap : Observable <2> : 2
switchMap : Observable <2> : 3
switchMap : Observable <2> : 4
switchMap : Observable <3> : 2
switchMap : Observable <3> : 3
switchMap : Observable <3> : 4
switchMap : Observable <3> : 5
switchMap : Observable <3> : 6
switchMap : Observable <4> : 3
.................

Grouping items

Items can be grouped by specific property or key.

First, we'll look at the groupBy() operator, a method that divides a source Observable instance into multiple Observable instances. Each of these Observable instances emits some of the source's items depending on a grouping function.

The groupBy() operator returns an Observable instance that emits Observable instances. These Observable instances are special; they are of type GroupedObservable, and you can retrieve their grouping keys using the getKey() method. Once the groupBy() operator is used, the different groups can be handled in a different or a common way.

Note that when the groupBy() operator creates an observable that emits the GroupedObservables instances, each of them buffers its items. So, if we ignore any of them, this buffer will present a potential memory leak.

The marble diagram of the groupBy() operator looks like this:

Grouping items

Here, the form of the items is used as the common trait of the grouping. For a better understanding of the idea of the method, we can look at this example:

List<String> albums = Arrays.asList(
  "The Piper at the Gates of Dawn",
  "A Saucerful of Secrets",
  "More", "Ummagumma",	"Atom Heart Mother",
  "Meddle", "Obscured by Clouds",
  "The Dark Side of the Moon",
  "Wish You Were Here", "Animals", "The Wall"
);
Observable
  .from(albums)
  .groupBy(album -> album.split(" ").length)
  .subscribe(obs ->
    subscribePrint(obs, obs.getKey() + " word(s)")
  );

The example emits some of Pink Floyd's album titles and groups them by the number of words contained in them. For example Meddle and More are in the same group with key 1, and A Saucerful of Secrets and Wish You Were Here are both in a group with the key of 4. All these groups are presented by the GroupedObservable instances, so we can subscribe to them in the subscribe() call of the source Observable instance. The different groups are printed with different labels, depending on their keys. The output of this little program is as follows:

7 word(s) : The Piper at the Gates of Dawn
4 word(s) : A Saucerful of Secrets
1 word(s) : More
1 word(s) : Ummagumma
3 word(s) : Atom Heart Mother
1 word(s) : Meddle
3 word(s) : Obscured by Clouds
6 word(s) : The Dark Side of the Moon
4 word(s) : Wish You Were Here
1 word(s) : Animals
2 word(s) : The Wall

The order the items are emitted in is the same, but they are emitted by different GroupedObservable instances. Also, all the GroupedObservable instances are completed after the source completes.

The groupBy() operator has another overload that takes a second, transforming function that transforms each of the items in a group in some way. Here is an example:

Observable
.from(albums)
.groupBy(
  album -> album.replaceAll("[^mM]", "").length(),
  album -> album.replaceAll("[mM]", "*")
)
.subscribe(
  obs -> subscribePrint(obs, obs.getKey()+" occurences of 'm'")
);

The album titles are grouped by the number of the occurrences of the letter m in them. The text is transformed in a way that all the occurrences of the letter are replaced with *. The output is as follows:

0 occurences of 'm' : The Piper at the Gates of Dawn
0 occurences of 'm' : A Saucerful of Secrets
1 occurences of 'm' : *ore
4 occurences of 'm' : U**agu**a
2 occurences of 'm' : Ato* Heart *other
1 occurences of 'm' : *eddle
0 occurences of 'm' : Obscured by Clouds
1 occurences of 'm' : The Dark Side of the *oon
0 occurences of 'm' : Wish You Were Here
1 occurences of 'm' : Ani*als
0 occurences of 'm' : The Wall

Note

The source code demonstrating use of the Observable.groupBy() operator can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/UsingGroupBy.java.

Additional useful transformation operators

There are a few additional transformations worth mentioning. For example, there is the cast() operator, which is a shortcut for the map(v -> someClass.cast(v)).

List<Number> list = Arrays.asList(1, 2, 3);
Observable<Integer> iObs = Observable
  .from(list)
  .cast(Integer.class);

The initial Observable instance here emits values of type Number, but they are actually Integer instances, so we can use the cast() operator to represent them as Integer instances.

Another helpful operator is the timestamp() operator. It adds a timestamp to each emitted value by transforming it into an instance of the Timestamped<T> class. This is helpful if, for example, we want to log the output of an Observable, as follows:

List<Number> list = Arrays.asList(3, 2);
Observable<Timestamped<Number>> timestamp = Observable
  .from(list)
  .timestamp();
subscribePrint(timestamp, "Timestamps");

In this example, each number is being timestamped. Again, that can be implemented using the map() operator very easily. The output of the preceding example looks like this:

Timestamps : Timestamped(timestampMillis = 1431184924388, value = 1)
Timestamps : Timestamped(timestampMillis = 1431184924394, value = 2)
Timestamps : Timestamped(timestampMillis = 1431184924394, value = 3)

A similar operator is the timeInterval operator, but it transforms a value to an instance of the TimeInterval<T> operator instead. A TimeInterval<T> instance represents an item emitted by an Observable along with the amount of time that elapsed either since the emission of the previous item, or (if there was no previous item) since the subscription. This can be used for generating statistics, for example:

Observable<TimeInterval<Long>> timeInterval = Observable
  .timer(0L, 150L, TimeUnit.MILLISECONDS)
  .timeInterval();
subscribePrint(timeInterval, "Time intervals");

This will output something similar to this:

Time intervals : TimeInterval [intervalInMilliseconds=13, value=0]
Time intervals : TimeInterval [intervalInMilliseconds=142, value=1]
Time intervals : TimeInterval [intervalInMilliseconds=149, value=2]
...................................................................

We can see that the different values are emitted roughly at 150 milliseconds, as they should be.

Both the timeInterval and timestamp operators work on the immediate scheduler (see Chapter 6, Using Concurrency and Parallelism with Schedulers), and both of them keep their time information in milliseconds.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.219.4.174