Operator onBackpressureBuffer()

This operator serves the purpose of BackpressureStrategy.BUFFER; except that here, you'll get some extra configuration options, such as buffer size, bounded or unbounded, and more. You may omit the configurations as well to use the default behavior.

So, let's look at some examples:

    fun main(args: Array<String>) { 
      val source = Observable.range(1, 1000) 
      source.toFlowable(BackpressureStrategy.MISSING)//(1) 
        .onBackpressureBuffer()//(2) 
        .map { MyItem11(it) } 
        .observeOn(Schedulers.io()) 
        .subscribe{ 
           println(it) 
           runBlocking { delay(1000) } 
         } 
         runBlocking { delay(600000) } 
     } 
 
     data class MyItem11 (val id:Int) { 
     init { 
        println("MyItem Created $id") 
     } 
    } 

Again, we are using the previous program with little tweaks. On comment (1), we created the Flowable instance with the BackpressureStrategy.MISSING option. On comment (2), to deal with backpressure, we used onBackpressureBuffer; the output is similar to the one in the BackpressureStrategy.BUFFER example, so we are omitting this.

You can specify the buffer size by using onBackpressureBuffer(). So let's modify the onBackpressureBuffer() method call with onBackpressureBuffer(20). The following is the output:

Yes, that change resulted in an error—the buffer is full. We defined 20 to be the buffer size, but Flowable needed a lot more size. This could be avoided by implementing the onError method.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.39.60