Flowable

We may call Flowables a backpressured version of Observables. Probably, the only difference between Flowables and Observables is that Flowable takes backpressure into consideration. Observable does not. That's it. Flowable hosts the default buffer size of 128 elements for operators, so, when the consumer is taking time, the emitted items may wait in the buffer.

Note that Flowables were added in ReactiveX 2.x (RxKotlin 2.X), and the previous versions don't include them. Instead, in previous versions, Observables was retrofitted to support backpressure that caused many unexpected MissingBackpressureException.
Here is the release note if you are interested:
https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0#observable-and-flowable

We had a long discussion so far; let's now try our hands on code. At first, we will try a code with Observable, and then we will do the same with Flowables to see and understand the difference:

    fun main(args: Array<String>) { 
      Observable.range(1,1000)//(1) 
        .map { MyItem3(it) }//(2) 
        .observeOn(Schedulers.computation()) 
        .subscribe({//(3) 
          print("Received $it;	") 
          runBlocking { delay(50) }//(4) 
         },{it.printStackTrace()}) 
         runBlocking { delay(60000) }//(5) 
     } 
     data class MyItem3 (val id:Int) { 
     init { 
       print("MyItem Created $id;	") 
     } 
    } 

A simple code with the Observable.range() operator, which should emit numbers from 1 to 1000. On comment (2), we used the map operator to create the MyItem3 object from Int. On comment (3), we subscribed to Observable. On comment (4), we ran a blocking delay to simulate a long running subscription code. On comment (5), we, again, ran a blocking delay code to wait for the consumer to complete processing of all items before the program stops execution.

The whole output will take some space, so we will put parts of outputs as screenshots here:

If you take a closer look at the output (screenshots), you will notice that the Observable (producer) continued to emit items, though the Observer (consumer) was not at all in pace with it. Until the time Observer (producer) finished emitting all the Items, the Observer (consumer) processed only the very first item (item 1). As mentioned earlier, this could lead to a lot of problems, including the OutOfMemory error. Now, let's replace Observable with Flowable in this code:

    fun main(args: Array<String>) { 
      Flowable.range(1,1000)//(1) 
        .map { MyItem4(it) }//(2) 
        .observeOn(Schedulers.io()) 
        .subscribe({//(3) 
          println("Received $it") 
          runBlocking { delay(50) }//(4) 
        },{it.printStackTrace()}) 
        runBlocking { delay(60000) }//(5) 
    } 
    data class MyItem4 (val id:Int) { 
      init { 
        println("MyItem Created $id") 
      }  
   } 

The code is exactly the same as the previous one, just the single difference is that we wrote Flowable.range() instead of Observable. Now, let's see the output and note the difference:

Have you noted the difference? Flowable, instead of emitting all the items, emitted few items in a chunk, waited for the consumer to coup up then again continued, and completed in an interleaved manner. This reduces a lot of problems itself.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.220.237.24