Take operators (take, takeLast, takeWhile, and takeUntil)

The take operators work in exactly the opposite way than the skip operators. Let's take an example of them one by one and understand how they work:

    fun main(args: Array<String>) { 
      val observable1 = Observable.range(1,20) 
      observable1 
        .take(5)//(1) 
        .subscribe(object:Observer<Int> { 
           override fun onError(e: Throwable) { 
              println("Error $e") 
          } 
 
          override fun onComplete() { 
              println("Complete") 
          } 
 
          override fun onNext(t: Int) { 
              println("Received $t") 
          } 
 
          override fun onSubscribe(d: Disposable) { 
              println("starting skip(count)") 
          } 
 
       }) 
 
      val observable2 = Observable.interval(100,TimeUnit.MILLISECONDS) 
      observable2 
         .take(400,TimeUnit.MILLISECONDS)//(2) 
         .subscribe( 
            object:Observer<Long> { 
               override fun onError(e: Throwable) { 
                  println("Error $e") 
               } 
 
               override fun onComplete() { 
                  println("Complete") 
               } 
 
               override fun onNext(t: Long) { 
                  println("Received $t") 
               } 
 
               override fun onSubscribe(d: Disposable) { 
                  println("starting skip(time)") 
               } 
 
             } 
           ) 
 
           runBlocking { 
             delay(1000) 
           } 
 
     }

This program is almost like the program with skip. The difference is that here, we used take instead of skip. Let's check the difference to understand better:

The output shows it clearly. In the exact opposite way than the skip operator, the take operator passes the specified emissions to downstream, discarding the remaining ones. Most importantly, it also sends onComplete notifications to downstream on its own, as soon as it completes passing all the specified emissions.

Let's test it with takeLast operator:

    fun main(args: Array<String>) { 
      val observable = Observable.range(1,20) 
      observable 
        .takeLast(5)//(1) 
        .subscribe(object: Observer<Int> { 
           override fun onError(e: Throwable) { 
             println("Error $e") 
           } 
 
           override fun onComplete() { 
             println("Complete") 
           } 
 
           override fun onNext(t: Int) { 
             println("Received $t") 
           } 
 
           override fun onSubscribe(d: Disposable) { 
             println("starting skipLast(count)") 
           } 
 
        }) 
    } 

And, here is the output; it prints the last 5 numbers in the emission:

Now take a look at the takeWhile:

    fun main(args: Array<String>) { 
      val observable = Observable.range(1,20) 
      observable 
        .takeWhile{item->item<10}//(1) 
        .subscribe(object: Observer<Int> { 
            override fun onError(e: Throwable) { 
              println("Error $e") 
            } 
 
            override fun onComplete() { 
              println("Complete") 
            } 
 
override fun onNext(t: Int) { println("Received $t") } override fun onSubscribe(d: Disposable) { println("starting skipWhile") } }) }

The output is the exact opposite of skipWhile; instead of skipping the first 10 numbers, it prints them and discards the remaining ones:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.37.191