Skipping emissions (skip, skipLast, skipUntil, and skipWhile)

There may be a requirement where you would like to skip some emissions at the beginning or skip emissions until a particular condition is met. You may even have to wait for another producer before taking emissions and skip all remaining ones.

These operators are designed keeping the exact scenario in mind. They help you skip emissions in various ways.

RxKotlin provides us with many variations and overloads of the skip operator; we will discuss the most important ones among them:

  • skip
  • skipLast
  • skipWhile
  • skipUntil

We will take a look at all of the preceding listed operators one by one.

Let's start with skip:

    fun main(args: Array<String>) { 
      val observable1 = Observable.range(1,20) 
      observable1 
      .skip(5)//(1) 
      .subscribe(object:Observer<Int> { 
         override fun onError(e: Throwable) { 
            println("Error $e") 
         } 
 
         override fun onComplete() { 
            println("Complete") 
         } 
 
         override fun onNext(t: Int) { 
            println("Received $t") 
         } 
 
override fun onSubscribe(d: Disposable) { println("starting skip(count)") } }) val observable2 = Observable.interval(100,TimeUnit.MILLISECONDS) observable2 .skip(400,TimeUnit.MILLISECONDS)//(2) .subscribe( object:Observer<Long> { override fun onError(e: Throwable) { println("Error $e") } override fun onComplete() { println("Complete") }
override fun onNext(t: Long) { println("Received $t") } override fun onSubscribe(d: Disposable) { println("starting skip(time)") } } ) runBlocking { delay(1000) } }

The skip operator has two important overloads: skip(count:Long) and skip(time:Long, unit:TimeUnit); the first overload works on count, discarding the first n number of emissions, while the second overload works on time, discarding all the emissions that came in the specified time duration.

In this program, on comment (1), we used the skip(count) operator to skip the first 5 emissions. On comment (2), we used the skip(time,unit) operator to skip all emissions in the first 400 milliseconds (4 seconds) of the subscription.

Here is the output:

Now, let's take a look at how the skipLast operator works:

    fun main(args: Array<String>) { 
      val observable = Observable.range(1,20) 
      observable 
       .skipLast(5)//(1) 
       .subscribe(object: Observer<Int> { 
         override fun onError(e: Throwable) { 
           println("Error $e") 
         } 
 
         override fun onComplete() { 
           println("Complete") 
         } 
 
         override fun onNext(t: Int) { 
            println("Received $t") 
         } 
 
         override fun onSubscribe(d: Disposable) { 
            println("starting skipLast(count)") 
         } 
 
       }) 
    } 

The skipLast operator has many overloads like the skip operator. The only difference is that this operator discards emissions from last. In this program, we used the skipLast(count) operator to skip the last 5 emissions on comment (1).

Here is the output:

Unlike skip and skipLast, both of which skip emissions on the basis of count or time, skipWhile skips them on the base of a predicate (logical expression). You've to pass a predicate to the skipWhile operator, just like the filter operator. It will keep skipping emissions while the predicate evaluates to true. It will start passing all emissions downstream as soon as the predicate returns false. Let's take a look at the following piece of code:

    fun main(args: Array<String>) { 
      val observable = Observable.range(1,20) 
      observable 
       .skipWhile {item->item<10}//(1) 
       .subscribe(object: Observer<Int> { 
          override fun onError(e: Throwable) { 
            println("Error $e") 
          } 
 
          override fun onComplete() { 
            println("Complete") 
          } 
 
          override fun onNext(t: Int) { 
            println("Received $t") 
          } 
 
          override fun onSubscribe(d: Disposable) { 
             println("starting skipWhile") 
          } 
 
        }) 
    } 

The output is as follows:

Note that, unlike filter, the skipWhile operator will execute the predicate until it returns false and pass all the emissions thereafter. If you want the predicate, check on all the emissions; you should rather consider the filter operator.

Think of a situation where you're working with two producers, producer1 and producer2, and want to start processing emissions from producer1 as soon as producer2 starts emitting. In this scenario, skipUntil can help you out. Let's look at this example:

    fun main(args: Array<String>) { 
      val observable1 = Observable.interval(100, TimeUnit.MILLISECONDS) 
      val observable2 =
Observable.timer(500,TimeUnit.MILLISECONDS)//(1) observable1 .skipUntil(observable2)//(2) .subscribe( object: Observer<Long> { override fun onError(e: Throwable) { println("Error $e") } override fun onComplete() { println("Complete") } override fun onNext(t: Long) { println("Received $t") }
override fun onSubscribe(d: Disposable) { println("starting skip(time)") } } ) runBlocking { delay(1500) } }

We will explain the code, but take a look at the output first:

On comment (1), we created an Observable instance (observable2) with Observable.timer, which should trigger emission after 500 milliseconds. On comment (2), we used that Observable instance (observable2) as the parameter to the skipUntil operator, which will make it discard all the emissions of observable1 until observable2 emits.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.35.122