Composing multiple operators with the Observable.compose operator

The compose() operator has one parameter of type Transformer. The Transformer interface, like the Operator one, is an empty interface that extends Func1 (this approach hides the type complexities that are involved by using Func1). The difference is that it extends the Func1<Observable<T>, Observable<R>> method, so that it transforms an Observable and not a Subscriber. This means that, instead of operating on each individual item emitted by the source observable, it operates directly on the source.

We can illustrate the use of this operator and the Transformer interface through an example. First, we will create a Transformer implementation:

public class OddFilter<T> implements Transformer<T, T> {
  @Override
  public Observable<T> call(Observable<T> observable) {
    return observable
      .lift(new Indexed<T>(1L))
      .filter(pair -> pair.getLeft() % 2 == 1)
      .map(pair -> pair.getRight());
  }
}

The idea of this implementation is to filter the emissions of an observable, depending on the order in which they are incoming. It operates on the whole sequence, using our Indexed operator to add an index to every item. After that, it filters the Pair instances that have odd indexes and retrieves the original items from the filtered Pair instances. That way, only the members of the emitted sequence that are placed at odd positions reach the subscribers.

Again let's write a unit test, ensuring that the new OddFilter transformer behaves in the right way:

@Test
public void testFiltersOddOfTheSequence() {
  Observable<String> tested = Observable
    .just("One", "Two", "Three", "Four", "Five", "June", "July")
    .compose(new OddFilter<String>());
  List<String> expected =
    Arrays.asList("One", "Three", "Five", "July");
  List<String> actual = tested
    .toList()
    .toBlocking()
    .single();
  assertEquals(expected, actual);
}

As you can see, an instance of our OddFilter class is passed to the compose() operator, and that way, it is applied to the observable that was created by the range() factory method. The observable emits seven strings. If the OddFilter implementation works right, it should filter out the strings emitted at odd positions.

More about implementing custom operators can be found here: https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators. If you use RxJava in dynamic languages such Groovy, you'll be able to extend the Observable class with new methods, or you can use RxJava with Xtend, a flexible dialect of Java. Refer to http://mnmlst-dvlpr.blogspot.de/2014/07/rxjava-and-xtend.html.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.210.102