Creating custom operators with lift

After learning about and using so many various operators, we are ready to write our own operators. The Observable class has an operator called lift. It receives an instance of the Operator interface. This interface is just an empty one that extends the Func1<Subscriber<? super R>, Subscriber<? super T>> interface. This means that we can pass even lambdas as operators.

The best way of learning how to use the lift operator is to write an example of it. Let's create an operator that adds a sequential index to every item emitted (of course, this is doable without a dedicated operator). This way, we will be able to produce indexed items. For this purpose, we need a class that stores an item and its index. Let's create a more general class called Pair:

public class Pair<L, R> {
  final L left;
  final R right;
  
public Pair(L left, R right) {
    this.left = left;
    this.right = right;
  }

  public L getLeft() {
    return left;
  }
  
public R getRight() {
    return right;
  }

  @Override
  public String toString() {
    return String.format("%s : %s", this.left, this.right);
  }
  
// hashCode and equals omitted

}'

The instances of this class are very simple immutable objects that contain two arbitrary objects. In our case, the left field will be the index of type Long and the right field will be the emitted item. The Pair class, as with any immutable class, contains implementations of the hashCode() and equals() methods.

Here is the code for the operator:

public class Indexed<T> implements Operator<Pair<Long, T>, T> {
  private final long initialIndex;
  public Indexed() {
    this(0L);
  }
  public Indexed(long initial) {
    this. initialIndex = initial;
  }
  @Override
  public Subscriber<? super T> call(Subscriber<? super Pair<Long, T>> s) {
    return new Subscriber<T>(s) {
      private long index = initialIndex;
      @Override
      public void onCompleted() {
        s.onCompleted();
      }
      @Override
      public void onError(Throwable e) {
        s.onError(e);
      }
      @Override
      public void onNext(T t) {
        s.onNext(new Pair<Long, T>(index++, t));
      }
    };
  }
}

The call() method of the Operator interface has one parameter, a Subscriber instance. This instance will subscribe to the observable that will be returned by the lift() operator. The method returns a new Subscriber instance, which will subscribe to the observable upon which the lift() operator was called. We can change the data of all the notifications in it and that is how we will be writing our own operator's logic.

The Indexed class has a state—index. By default, its initial value is 0, but there is a constructor that can create an Indexed instance with arbitrary initial value. Our operator delegates the OnError and OnCompleted notifications to the subscribers unchanged. The interesting method is onNext(). It modifies the incoming item by creating a Pair instance of it and the current value of index field. After that, the index is incremented. That way, the next item will use the incremented index and increment it again.

And now, we have our first operator. Let's write an unit test to showcase its behavior:

@Test
public void testGeneratesSequentialIndexes() {
  Observable<Pair<Long, String>> observable = Observable
    .just("a", "b", "c", "d", "e")
    .lift(new Indexed<String>());
  List<Pair<Long, String>> expected = Arrays.asList(
    new Pair<Long, String>(0L, "a"),
    new Pair<Long, String>(1L, "b"),
    new Pair<Long, String>(2L, "c"),
    new Pair<Long, String>(3L, "d"),
    new Pair<Long, String>(4L, "e")
  );
  List<Pair<Long, String>> actual = observable
    .toList()
    .toBlocking().
    single();
  assertEquals(expected, actual);
  // Assert that it is the same result for a second subscribtion.
  TestSubscriber<Pair<Long, String>> testSubscriber = new TestSubscriber<Pair<Long, String>>();
  observable.subscribe(testSubscriber);
  testSubscriber.assertReceivedOnNext(expected);
}

The test emits the letters from 'a' to 'e' and uses the lift() operator to insert our Indexed operator implementation into the observable chain. We expect a list of five Pair instances of sequential numbers starting from zero—the indexes and the letters. We use the toList().toBlocking().single() technique to retrieve the actual list of emitted items and to assert that they are equal to the expected emissions. Because Pair instances have the hashCode() and equals() methods defined, we can compare Pair instances, so the test passes. If we subscribe for the second time, the Indexed operator should provide indexing from the initial index, 0. Using a TestSubscriber instance, we do that and assert that the letters are indexed, starting with 0.

Using the lift() operator and different Operator implementations, we can write our own operators, which operate on every single item of the emitted sequence. But in most cases, we will be able to implement our logic without creating new operators. For example, the indexed behavior can be implemented in many different ways, one of which is by zipping with Observable.range method, like this:

Observable<Pair<Long, String>> indexed = Observable.zip(
  Observable.just("a", "b", "c", "d", "e"),
  Observable.range(0, 100),
  (s, i) -> new Pair<Long, String>((long) i, s)
);
subscribePrint(indexed, "Indexed, no lift");

Implementing a new operator has many traps, such as chaining the subscriptions, supporting backpressure, and reusing variables. If possible, we should try to compose the existing operators, which are written by experienced RxJava contributors. So, in some cases, an operator that transforms the Observable itself is a better idea, for example, applying multiple operators on it as one. For this, we can use the composing operator, compose().

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.17.128