Flowable and Subscriber

Instead of Observer, Flowable uses Subscriber, which is backpressure compatible. However, if you use lambda expressions, then you will not notice any differences. So, why use Subscriber instead of Observer? Because Subscriber supports some extra operations and backpressure. For instance, it can convey how many items it wishes to receive as a message to upstream. Or rather, we can say while using Subscriber; you must specify how many items you want to receive (request) from upstream; if you don't specify it, you will not receive any emissions.

As we already mentioned, using lambda with Subscriber is similar to Observe; this implementation will automatically request an unbounded number of emissions from the upstream. As with our last code, we didn't specify how many emissions we want, but it internally requested unbounded number of emissions, and that's why we received all the items emitted.

So, let's try replacing the previous program with a Subscriber instance:

    fun main(args: Array<String>) { 
      Flowable.range(1, 1000)//(1) 
        .map { MyItem5(it) }//(2) 
        .observeOn(Schedulers.io()) 
        .subscribe(object : Subscriber<MyItem5> {//(3) 
          override fun onSubscribe(subscription: Subscription) { 
            subscription.request(Long.MAX_VALUE)//(4) 
           } 
 
           override fun onNext(s: MyItem5?) { 
             runBlocking { delay(50) } 
             println("Subscriber received " + s!!) 
           } 
 
           override fun onError(e: Throwable) { 
             e.printStackTrace() 
           } 
 
           override fun onComplete() { 
             println("Done!") 
           } 
          }) 
          runBlocking { delay(60000) } 
       } 
 
       data class MyItem5 (val id:Int) { 
       init { 
         println("MyItem Created $id") 
       } 
    } 

The output of the preceding program will be the same as for the previous one, so we are skipping the output here. Instead, let's understand the code. The program is almost identical to the previous one, until comment (3), where we created an instance of Subscriber. The methods of Subscriber are identical with Observer; however, as I mentioned earlier, on the subscribe method, you have to request for the number of emissions that you want initially. We did the same on comment (4); however, as we want to receive all emissions, we requested it with Long.MAX_VALUE.

So, how does the request method work? The request() method will request the number of emissions the Subscriber should listen on from the upstream, counting after the method is called. The Subscriber will ignore any further emissions after the requested emissions until you request for more.

So, let's modify this program to understand the request method better:

    fun main(args: Array<String>) { 
      Flowable.range(1, 15) 
        .map { MyItem6(it) } 
        .observeOn(Schedulers.io()) 
        .subscribe(object : Subscriber<MyItem6> { 
           lateinit var subscription: Subscription//(1) 
           override fun onSubscribe(subscription: Subscription) { 
              this.subscription = subscription 
              subscription.request(5)//(2) 
           } 
 
           override fun onNext(s: MyItem6?) { 
             runBlocking { delay(50) } 
             println("Subscriber received " + s!!) 
               if(s.id == 5) {//(3) 
                  println("Requesting two more") 
                  subscription.request(2)//(4) 
                } 
            } 
 
            override fun onError(e: Throwable) { 
               e.printStackTrace() 
            } 
 
            override fun onComplete() { 
               println("Done!") 
            } 
           }) 
           runBlocking { delay(10000) } 
     } 
 
    data class MyItem6 (val id:Int) { 
      init { 
        println("MyItem Created $id") 
      } 
     } 

So, what are the tweaks we made in this program? Let's go through it. On comment (1), we declared a lateinit variable of type Subscription, we initialized that subscription inside the onSubscribe method, just before comment (2). On comment (2), we requested for 5 items with subscription.request(5). Then, inside onNext, on comment (3), we checked if the received item is the 5th one (as we are using a range, the 5th item's value will be 5); if the item is the 5th one, then we are again requesting for 2 more. So, the program should print seven items instead of the 1-15 range. Let's check the following output:

So, although Flowable emitted all the items for the range, it was never passed to Subscriber after 7.

Note that the request() method just not goes all the way upstream, it just conveys to the latest preceding operator, which, in turn, decides on whether to/how to relay that information to further upstream.

So, we got some understanding on Flowable and Subscriber. Now, it's time to explore them in depth. We will start with creating a Flowable instance from scratch.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.54.245