The buffer() operator

Unlike the onBackPressureBuffer() operator, which buffers emissions until the consumer consumes, the buffer() operator will gather emissions as a batch and will emit them as a list or any other collection type.

So, let's look at this example:

    fun main(args: Array<String>) { 
      val flowable = Flowable.range(1,111)//(1) 
      flowable.buffer(10)//(2) 
        .subscribe { println(it) } 
    } 

On comment (1), we created a Flowable instance with the Flowable.range() method, which emits integers from 1 to 111. On comment (2), we used the buffer operator with 10 as the buffer size, so the buffer operator gathers 10 items from the Flowable and emits them as a list.

The following is the output, which satisfies the understanding:

The buffer operator has quite good configuration options, such as the skip parameter.

It accepts a second integer parameter as the skip count. It works in a really interesting way. If the value of the skip parameter is exactly the same as the count parameter, then it will do nothing. Otherwise, it will first calculate the positive difference between the count and skip parameters as actual_numbers_to_skip, and, then, if the value of the skip parameter is greater than the value of the count parameter, it will skip the actual_numbers_to_skip items after the last item of each emission. Otherwise, if the value of the count parameter is greater than the value of the skip parameter, you'll get rolling buffers, that is, instead of skipping the items, it will skip the counts from the previous emissions.

Confused? Let's look at this example to clear things up:

    fun main(args: Array<String>) { 
      val flowable = Flowable.range(1,111) 
      flowable.buffer(10,15)//(1) 
       .subscribe { println("Subscription 1 $it") } 
 
      flowable.buffer(15,7)//(2) 
       .subscribe { println("Subscription 2 $it") } 
   } 

On comment (1), we used buffer with count 10, skip 15, for the first subscription. On comment (2), we used it as count 15, skip 8, for the second subscription. The following is the output:

For the first subscription, it skipped 5 items after each subscription (15-10). However, for the second one, it repeated items from the 8th item in each emission (15-7).

If the preceding uses of the buffer operator were not enough for you, then let me tell you the buffer operator also lets you do time-based buffering. Put simply, it can gather emissions from a source and emit them at a time interval. Interesting right? Let's explore it:

    fun main(args: Array<String>) { 
      val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)//(1) 
      flowable.buffer(1,TimeUnit.SECONDS)//(2) 
       .subscribe { println(it) } 
 
      runBlocking { delay(5, TimeUnit.SECONDS) }//(3) 
   } 

To understand things better, we used Flowable.interval in this example to create a Flowable instance on comment (1). On comment (2), we used the buffer(timespan:Long, unit:TimeUnit) overload to instruct the operator to buffer all emissions for a second and emit them as a list.

This is the output:

As you can see in the example, each of the emissions contains 10 items as Flowable.interval() is emitting one each 100 milliseconds and buffer is gathering emissions within a second timeframe (1 second = 1000 milliseconds, emission with a 100 milliseconds interval would result in 10 emissions in one second).

Another exciting feature of the buffer operator is that it can take another producer as the boundary, that is, the buffer operator will gather all the emissions of the source producer between two emissions of the boundary producer, and will emit the list on each boundary producer's emission.

Here is an example:

    fun main(args: Array<String>) { 
      val boundaryFlowable = Flowable.interval(350, TimeUnit.MILLISECONDS) 
 
      val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)//(1) 
      flowable.buffer(boundaryFlowable)//(2) 
       .subscribe { println(it) } 
 
      runBlocking { delay(5, TimeUnit.SECONDS) }//(3) 
     
    } 

And the following is the output:

The buffer operator emits a gathered list whenever boundaryFlowable emits.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.217.107.229