Accumulating data

The scan(Func2) operator takes a function with two arguments as a parameter. Its result is an Observable instance. The first item, emitted by the result of the scan() method, is the first item of the source Observable instance. The second item emitted is created by applying the function that was passed to the scan() method on the previous item emitted by the result Observable instance and the second item, emitted by the source Observable instance. The third item, emitted by the scan() method result, is created by applying the function, passed to the scan() method to the previous item, emitted by it and the third item emitted by the source Observable instance. This pattern continues in order to create the rest of the sequence emitted by the Observable instance creates by the scan() method. The function passed to the scan() method is called an accumulator.

Let's look at the marble diagram of the scan(Func2) method:

Accumulating data

The items emitted by the scan() method can be generated using an accumulated state. In the diagram, the circle is accumulated in the triangle, and then this triangle-circle is accumulated in the square.

This means that we can emit the sums of a sequence of integers, for example:

Observable<Integer> scan = Observable
  .range(1, 10)
  .scan((p, v) -> p + v);
subscribePrint(scan, "Sum");
subscribePrint(scan.last(), "Final sum");

The first subscription will output all the emissions : 1, 3 (1+2), 6 (3 + 3), 10 (6 + 4) .. 55. But in most cases, we are interested only in the last emitted item—the final sum. We can use an Observable instance that emits only the last element, using the last() filtering operator. It's worth mentioning that there is a reduce(Func2) operator, an alias for the scan(Func2).last().

The scan() operator has one overload which can be used with a seed/initial parameter. In this case, the function passed to the scan(T, Func2) operator is applied to the first item emitted by the source and this seed parameter.

Observable<String> file = CreateObservable.from(
  Paths.get("src", "main", "resources", "letters.txt")
);
scan = file.scan(0, (p, v) -> p + 1);
subscribePrint(scan.last(), "wc -l");

This example counts the number of lines in a file. The file Observable instance emits the lines of the file specified by the given path, one-by-one. We use the scan(T, Func2) operator with a seed value of 0 to count the lines by adding one to the accumulated count on every line.

We will conclude this chapter with an example using many of the operators introduced in it, together. Let's look at it:

Observable<String> file = CreateObservable.from(
  Paths.get("src", "main", "resources", "operators.txt")
);
Observable<String> multy = file
  .flatMap(line -> Observable.from(line.split("\."))) // (1)
  .map(String::trim) // (2)
  .map(sentence -> sentence.split(" ")) // (3)
  .filter(array -> array.length > 0) // (4)
  .map(array -> array[0]) // (5)
  .distinct() // (6)
  .groupBy(word -> word.contains("'")) //(7)
  .flatMap(observable -> observable.getKey() ? observable : // (8)
    observable.map(Introspector::decapitalize))
  .map(String::trim) // (9)
  .filter(word -> !word.isEmpty()) // (10)
  .scan((current, word) -> current + " " + word) // (11)
  .last() // (12)
  .map(sentence -> sentence + "."); // (13)
subscribePrint(multy, "Multiple operators"); // (14)

This piece of code uses lots of operators to filter out and assemble a sentence hidden in a file. The file is represented by an Observable instance, which emits all the lines contained in it one by one.

  1. We don't want to operate only on the different lines; we want to emit all the sentences contained in the file. So, we use the flatMap operator to create an Observable instance which emits the file sentences by sentence (determined by dot).
  2. We trim these sentences using the map() operator. It is possible for them to contain some leading or trailing spaces.
  3. We want to operate on the different words contained in our sentence items, so we turn them into arrays of words, using the map() operator and the String::split parameter.
  4. We don't care about empty sentences (if there are any), so we filter them out using the filter() operator.
  5. We need only the first words from the sentences, so we use the map() operator to get them. The resulting Observable instance emits the first word of every sentence contained in the file.
  6. We don't need duplicated words, so we use the distinct() operator to get rid of them.
  7. Now we want to branch our logic in a way that some of the words are treated differently. So we use the groupBy() operator and a Boolean key to divide our words into two Observable instances. The key is True for the chosen words and False for all the others.
  8. Using the flatMap operator, we join our separated words, but only the chosen ones (with a key of True) are left unchanged. The rest are decapitalized.
  9. We trim all the different words from leading/trailing spaces, using the map() operator.
  10. We use the filter() operator to filter out the empty ones.
  11. Using the scan() operator, we concatenate the words with spaces as separators.
  12. With the last() operator, our resulting Observable instance will emit only the last concatenation, containing all the words.
  13. One last call to the map() operator creates a sentence from our concatenated words by adding a dot.
  14. If we output the single item emitted by this Observable instance, we'll get a sentence composed of the first words of all the sentences contained in the initial file (skipping duplicated words)!

And the output is as follows:

Multiple operators : I'm the one who will become RX.
Multiple operators ended!
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.214.27