The compose()
operator has one parameter of type Transformer
. The Transformer
interface, like the Operator
one, is an empty interface that extends Func1
(this approach hides the type complexities that are involved by using Func1
). The difference is that it extends the Func1<Observable<T>, Observable<R>>
method, so that it transforms an Observable
and not a Subscriber
. This means that, instead of operating on each individual item emitted by the source observable, it operates directly on the source.
We can illustrate the use of this operator and the Transformer
interface through an example. First, we will create a Transformer
implementation:
public class OddFilter<T> implements Transformer<T, T> { @Override public Observable<T> call(Observable<T> observable) { return observable .lift(new Indexed<T>(1L)) .filter(pair -> pair.getLeft() % 2 == 1) .map(pair -> pair.getRight()); } }
The idea of this implementation is to filter the emissions of an observable, depending on the order in which they are incoming. It operates on the whole sequence, using our Indexed
operator to add an index to every item. After that, it filters the Pair
instances that have odd indexes and retrieves the original items from the filtered Pair
instances. That way, only the members of the emitted sequence that are placed at odd positions reach the subscribers.
Again let's write a unit test, ensuring that the new OddFilter
transformer behaves in the right way:
@Test
public void testFiltersOddOfTheSequence() {
Observable<String> tested = Observable
.just("One", "Two", "Three", "Four", "Five", "June", "July")
.compose(new OddFilter<String>());
List<String> expected =
Arrays.asList("One", "Three", "Five", "July");
List<String> actual = tested
.toList()
.toBlocking()
.single();
assertEquals(expected, actual);
}
As you can see, an instance of our OddFilter
class is passed to the compose()
operator, and that way, it is applied to the observable that was created by the range()
factory method. The observable emits seven strings. If the OddFilter
implementation works right, it should filter out the strings emitted at odd positions.
The source code of the OddFilter
class can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter08/Compose.java. The unit test testing it can be viewed/downloaded at https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter08/IndexedTest.java.
More about implementing custom operators can be found here: https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators. If you use RxJava in dynamic languages such Groovy, you'll be able to extend the Observable
class with new methods, or you can use RxJava with Xtend, a flexible dialect of Java. Refer to http://mnmlst-dvlpr.blogspot.de/2014/07/rxjava-and-xtend.html.
3.129.210.102