Composing operators with transformer

So, you have learned how to create custom operators, but think of a situation when you want to create a new operator by combining multiple operators. For instance, I often wanted to combine the functionality of the subscribeOn and observeOn operators so that all the computations can be pushed to computation threads, and, when the results are ready, we can receive them on the main thread.

Yes, it's possible to get the benefits of both operators by adding both operators one after the other to the chain, as shown here:

    fun main(args: Array<String>) { 
      Observable.range(1,10) 
        .map { 
           println("map - ${Thread.currentThread().name} $it") 
           it 
         } 
         .subscribeOn(Schedulers.computation()) 
         .observeOn(Schedulers.io()) 
         .subscribe { 
            println("onNext - ${Thread.currentThread().name} $it") 
         } 
 
        runBlocking { delay(100) } 
     } 

Though you're already aware of the output, the following is the screenshot if you need a refresher:

Now, say we have this combination of the subscribeOn and observeOn operator throughout our project, so we want a shortcut. We want to create our own operator where we would pass the two Scheduler's where we want subscribeOn and observeOn, and everything should work perfectly.

RxKotlin provides the Transformer interfaces (ObservableTransformer and FlowableTransformer are two Transformer interfaces) for that purpose. Just like the operator interfaces, it has only one method—apply. The only difference is that here, instead of Observers, you have the Observable. So, instead of operating on individual emits and their items, here, you work directly on the source.

Here is the signature of the ObservableTransformer interface:

    interface ObservableTransformer<Upstream, Downstream> { 
       /** 
       * Applies a function to the upstream Observable 
and returns an ObservableSource with * optionally different element type. * @param upstream the upstream Observable instance * @return the transformed ObservableSource instance */ @NonNull fun apply(@NonNull upstream: Observable<Upstream>):
ObservableSource<Downstream> }

The interface signature is almost the same. Unlike the apply method of ObservableOperator, here, the apply method receives Upstream Observable and should return the Observable that should be passed to the Downstream.

So, back to our topic, the following code block should fulfill our requirements:

    fun main(args: Array<String>) { 
      Observable.range(1,10) 
        .map { 
           println("map - ${Thread.currentThread().name} $it") 
           it 
         } 
         .compose(SchedulerManager(Schedulers.computation(), 
             Schedulers.io())) 
             .subscribe { 
               println("onNext - ${Thread.currentThread().name} $it") 
             } 
 
            runBlocking { delay(100) } 
    } 
 
    class SchedulerManager<T>(val subscribeScheduler:Scheduler,
val observeScheduler:Scheduler):ObservableTransformer<T,T> { override fun apply(upstream: Observable<T>):
ObservableSource<T> { return upstream.subscribeOn(subscribeScheduler) .observeOn(observeScheduler) } }

In the preceding code, we created a class for our requirement—SchedulerManager—that would take two Scheduler as parameters. The first one is to be passed to the subscribeOn operator and the second one is for the observeOn operator.

Inside the apply method, we returned the Observable Upstream, after applying two operators to it.

We are omitting the screenshot of the output, as it is the same as the previous one.

Like the lift operator, the compose operator can also be implemented using a lambda. Let's have another example where we will transform an Observable<Int> to an Observable<List>. Here is the code:

    fun main(args: Array<String>) { 
      Observable.range(1,10) 
        .compose<List<Int>> { 
           upstream: Observable<Int> -> 
           upstream.toList().toObservable() 
         } 
         .first(listOf()) 
         .subscribeBy { 
            println(it) 
         } 
    } 

In the preceding code, we used upstream.toList().toObservable() as the Observable$toList() operator converts an Observable<T> to Single<List<T>>, so we need the toObservable() operator to convert it back to Observable.

Here is the screenshot of the output:

Composing multiple operators to create a new one is also super easy in RxKotlin; just add a bit extension function to it to see how things become more delightful.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.216.75