Creating your own operators

So far, we have used lots of operators, but are we sure they will meet all our needs? Or, can we always find a fitting operator for each requirement we face? No, that's not possible. Sometimes, we may have to create our own operators for our own needs, but how?

RxKotlin is always there to make your life easier. It has an operator just for this purpose—the lift operator. The lift operator receives an instance of ObservableOperator; so, to create your own operator, you have to implement that interface.

In my opinion, the best way to learn something is by doing it. What about creating a custom operator that would add a sequential number to every emission? Let's create it as per the following list of requirements:

  • The operator should emit a pair, with an added sequential number as the first element. The second element of the pair should be the actual emission.
  • The operator should be generic and should work with any type of Observable.
  • As with other operators, the operator should work concurrently with other operators.

The preceding points are our basic requirements; and, as per the preceding requirement, we must use AtomicInteger for the counter (which will count the emissions, and we will pass that count as a sequential number) so that the operator will work seamlessly with any Scheduler.

Every custom operator should implement the ObservableOperator interface, which looks like this:

    interface ObservableOperator<Downstream, Upstream> { 
      /** 
      * Applies a function to the child Observer and returns a new
parent Observer. * @param observer the child Observer instance * @return the parent Observer instance * @throws Exception on failure */ @NonNull @Throws(Exception::class) fun apply(@NonNull observer: Observer<in Downstream>):
Observer<in Upstream>; }

Downstream and Upstream are two generic types here. Downstream specifies the type that will be passed to the Downstream of the operator, and Upstream specifies the type that the operator will receive from upstream.

The apply function has a parameter called the Observer that should be used to pass the emission to the Downstream, and the function should return another Observer that will be used to listen to the upstream emissions.

Enough theory. The following is the definition of our AddSerialNumber operator. Take a careful look at it here:

    class AddSerialNumber<T> : ObservableOperator<Pair<Int,T>,T> { 
      val counter:AtomicInteger = AtomicInteger() 
 
      override fun apply(observer: Observer<in Pair<Int, T>>):
Observer<in T> { return object : Observer<T> { override fun onComplete() { observer.onComplete() } override fun onSubscribe(d: Disposable) { observer.onSubscribe(d) } override fun onError(e: Throwable) { observer.onError(e) } override fun onNext(t: T) { observer.onNext(Pair(counter.incrementAndGet(),t)) } } } }

Let's start describing this from the very first feature—the definition of the AddSerialNumber class. This implements the ObservableOperator interface. As per our requirement, we kept the class generic, that is, we specified the Upstream type to be generic T.

We used an AtomicInteger as a val property of the class, which should be initialized within the init block (as we are declaring and defining the property within the class, it would be automatically initialized within init while creating instances of the class). That AtomicInteger, counter should increment on each emission and should return the emitted value as the serial number of the emission.

Inside the apply method, I created and returned an Observer instance, which would be used to listen to the upstream as described earlier. Basically, every operator passes an Observer to upstream by which it should receive the events.

Inside that observer, whenever we receive any event, we echoed that to the Observer downstream (where it is received as a parameter).

Inside the onNext event of the Upstream Observer, we incremented the counter, added it as the first element to a Pair instance, added the item we received (as a parameter in onNext) as the second value, and, finally, passed it to the onNextobserver.onNext(Pair(counter.incrementAndGet(),t)) downstream.

So, what now? We created a class that can be used as an operator, but how do we use it? It's easy, take a look at this piece of code:

    fun main(args: Array<String>) { 
      Observable.range(10,20) 
       .lift(AddSerialNumber<Int>()) 
        .subscribeBy ( 
           onNext = { 
             println("Next $it") 
           }, 
           onError = { 
             it.printStackTrace() 
           }, 
           onComplete = { 
             println("Completed") 
           } 
       ) 
    } 

You just have to create an instance of your operator and pass it to the lift operator; that's all you need, we have now created our first operator.

Look at the following output:

We have created our first operator, and, frankly, that was super easy. Yes, it seemed a bit confusing at the start, but as we moved forward, it became easier.

As you may have noticed, the ObservableOperator interface has only one method, so we can obviously replace the class declaration and everything with just a lambda, as shown here:

    fun main(args: Array<String>) { 
      listOf("Reactive","Programming","in","Kotlin",
"by Rivu Chakraborty","Packt") .toObservable() .lift<Pair<Int,String>> { observer -> val counter = AtomicInteger() object :Observer<String> { override fun onSubscribe(d: Disposable) { observer.onSubscribe(d) }
override fun onNext(t: String) { observer.onNext(Pair(counter.incrementAndGet(), t)) } override fun onComplete() { observer.onComplete() } override fun onError(e: Throwable) { observer.onError(e) } } } .subscribeBy ( onNext = { println("Next $it") }, onError = { it.printStackTrace() }, onComplete = { println("Completed") } ) }

In this example, we used a list of String to create Observable instead of an Int range.

The following is the output:

The program is almost similar to the previous one, except that we used a lambda and used Pair<Int,String> as the type of downstream Observer.

As we have gained our grip in creating our custom operators, let's move forward by learning how to create transformers—no, not the autobot like the movie series; they are just RxKotlin transformers. What are they? Let's see.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.253.210