Creating Flowable from Observable

The Observable.toFlowable() operator provides you with another way to implement BackpressureStrategy into non-backpressured source. This operator turns any Observable into a Flowable, so let's get our hands dirty, and, first, let's try converting an Observable into Flowable with the buffering strategy, then we will try out a few other strategies in the same example to understand it better. Please refer to the following code:

    fun main(args: Array<String>) { 
      val source = Observable.range(1, 1000)//(1) 
      source.toFlowable(BackpressureStrategy.BUFFER)//(2) 
        .map { MyItem7(it) } 
        .observeOn(Schedulers.io()) 
        .subscribe{//(3) 
          print("Rec. $it;	") 
          runBlocking { delay(1000) } 
        } 
        runBlocking { delay(100000) } 
    } 
 
    data class MyItem7 (val id:Int) { 
      init { 
        print("MyItem init $id") 
      } 
   } 

So, on comment (1), we created an Observable with the Observable.range() method. On comment (2), we converted it to Flowable with BackpressureStrategy.BUFFER. Then, we subscribed to it with a lambda as the Subscriber. Let's see some portions of the output as a screenshot (as the complete output will be too long to paste here):

So, as expected, the downstream here processes all the emissions, as the BackpressureStrategy.BUFFER buffers all the emissions until the downstream consumes.

So, now, let's try with BackpressureStrategy.ERROR and check what happens:

    fun main(args: Array<String>) { 
      val source = Observable.range(1, 1000) 
      source.toFlowable(BackpressureStrategy.ERROR) 
        .map { MyItem8(it) } 
        .observeOn(Schedulers.io()) 
        .subscribe{ 
           println(it) 
           runBlocking { delay(600) } 
        } 
        runBlocking { delay(700000) } 
      } 
 
      data class MyItem8 (val id:Int) { 
      init { 
        println("MyItem Created $id") 
      } 
    } 

The following is the output:

It showed an error as the downstream couldn't keep up with the upstream, as we described it earlier.

What would happen if we use the BackpressureStrategy.DROP option? Let's check:

    fun main(args: Array<String>) { 
      val source = Observable.range(1, 1000) 
      source.toFlowable(BackpressureStrategy.DROP) 
        .map { MyItem9(it) } 
        .observeOn(Schedulers.computation()) 
        .subscribe{ 
           println(it) 
           runBlocking { delay(1000) } 
        } 
        runBlocking { delay(700000) } 
     } 
 
     data class MyItem9 (val id:Int) { 
     init { 
        println("MyItem Created $id") 
     } 
    } 

Everything is the same as in the previous example, except, here, we used the BackpressureStrategy.DROP option. Let's check the output:

So, as we can see in the preceding output, BackpressureStrategy.DROP stopped Flowable from emitting after 128, as the downstream couldn't keep up with, just as we described earlier.

Now, as we have gained some grip on the options available in BackpressureStrategy, let's focus on the BackpressureStrategy.MISSING option and how to use them with the onBackpressureXXX() operators.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.226.120