After learning about and using so many various operators, we are ready to write our own operators. The Observable
class has an operator called lift
. It receives an instance of the Operator
interface. This interface is just an empty one that extends the Func1<Subscriber<? super R>, Subscriber<? super T>>
interface. This means that we can pass even lambdas as operators.
The best way of learning how to use the lift
operator is to write an example of it. Let's create an operator that adds a sequential index to every item emitted (of course, this is doable without a dedicated operator). This way, we will be able to produce indexed items. For this purpose, we need a class that stores an item and its index. Let's create a more general class called Pair
:
public class Pair<L, R> { final L left; final R right; public Pair(L left, R right) { this.left = left; this.right = right; } public L getLeft() { return left; } public R getRight() { return right; } @Override public String toString() { return String.format("%s : %s", this.left, this.right); } // hashCode and equals omitted }'
The instances of this class are very simple immutable objects that contain two arbitrary objects. In our case, the left field will be the index of type Long
and the right field will be the emitted item. The Pair
class, as with any immutable class, contains implementations of the hashCode()
and equals()
methods.
Here is the code for the operator:
public class Indexed<T> implements Operator<Pair<Long, T>, T> { private final long initialIndex; public Indexed() { this(0L); } public Indexed(long initial) { this. initialIndex = initial; } @Override public Subscriber<? super T> call(Subscriber<? super Pair<Long, T>> s) { return new Subscriber<T>(s) { private long index = initialIndex; @Override public void onCompleted() { s.onCompleted(); } @Override public void onError(Throwable e) { s.onError(e); } @Override public void onNext(T t) { s.onNext(new Pair<Long, T>(index++, t)); } }; } }
The call()
method of the Operator
interface has one parameter, a Subscriber
instance. This instance will subscribe to the observable that will be returned by the lift()
operator. The method returns a new Subscriber
instance, which will subscribe to the observable upon which the lift()
operator was called. We can change the data of all the notifications in it and that is how we will be writing our own operator's logic.
The Indexed
class has a state—index
. By default, its initial value is 0
, but there is a constructor that can create an Indexed
instance with arbitrary initial value. Our operator delegates the OnError
and OnCompleted
notifications to the subscribers unchanged. The interesting method is onNext()
. It modifies the incoming item by creating a Pair
instance of it and the current value of index
field. After that, the index
is incremented. That way, the next item will use the incremented index
and increment it again.
And now, we have our first operator. Let's write an unit test to showcase its behavior:
@Test
public void testGeneratesSequentialIndexes() {
Observable<Pair<Long, String>> observable = Observable
.just("a", "b", "c", "d", "e")
.lift(new Indexed<String>());
List<Pair<Long, String>> expected = Arrays.asList(
new Pair<Long, String>(0L, "a"),
new Pair<Long, String>(1L, "b"),
new Pair<Long, String>(2L, "c"),
new Pair<Long, String>(3L, "d"),
new Pair<Long, String>(4L, "e")
);
List<Pair<Long, String>> actual = observable
.toList()
.toBlocking().
single();
assertEquals(expected, actual);
// Assert that it is the same result for a second subscribtion.
TestSubscriber<Pair<Long, String>> testSubscriber = new TestSubscriber<Pair<Long, String>>();
observable.subscribe(testSubscriber);
testSubscriber.assertReceivedOnNext(expected);
}
The test emits the letters from 'a'
to 'e'
and uses the lift()
operator to insert our Indexed
operator implementation into the observable chain. We expect a list of five Pair
instances of sequential numbers starting from zero—the indexes and the letters. We use the toList().toBlocking().single()
technique to retrieve the actual list of emitted items and to assert that they are equal to the expected emissions. Because Pair
instances have the hashCode()
and equals()
methods defined, we can compare Pair
instances, so the test passes. If we subscribe for the second time, the Indexed
operator should provide indexing from the initial index, 0
. Using a TestSubscriber
instance, we do that and assert that the letters are indexed, starting with 0
.
The code for the Indexed
operator can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter08/Lift.java and the unit test testing its behavior at https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter08/IndexedTest.java.
Using the lift()
operator and different Operator
implementations, we can write our own operators, which operate on every single item of the emitted sequence. But in most cases, we will be able to implement our logic without creating new operators. For example, the indexed behavior can be implemented in many different ways, one of which is by zipping with Observable.range
method, like this:
Observable<Pair<Long, String>> indexed = Observable.zip( Observable.just("a", "b", "c", "d", "e"), Observable.range(0, 100), (s, i) -> new Pair<Long, String>((long) i, s) ); subscribePrint(indexed, "Indexed, no lift");
Implementing a new operator has many traps, such as chaining the subscriptions, supporting backpressure, and reusing variables. If possible, we should try to compose the existing operators, which are written by experienced RxJava contributors. So, in some cases, an operator that transforms the Observable
itself is a better idea, for example, applying multiple operators on it as one. For this, we can use the composing operator, compose()
.
3.144.17.128