Types of scheduler

As an abstraction layer for thread pool management, the scheduler API provides you with some pre-composed scheduler. It also allows you to create a new user-defined scheduler. Let's take a look at the available scheduler types:

  • Schedulers.io()
  • Schedulers.computation()
  • Schedulers.newThread()
  • Schedulers.single()
  • Schedulers.trampoline()
  • Schedulers.from()

We will look into their definitions and their prescribed use-cases, but first, let's get started with some code.

We will start with a usual example without a scheduler, and then we will implement a scheduler in the same example to observe the difference, as follows:

    fun main(args: Array<String>) { 
      Observable.range(1,10) 
        .subscribe { 
           runBlocking { delay(200) } 
           println("Observable1 Item Received $it") 
         } 
 
      Observable.range(21,10) 
        .subscribe { 
           runBlocking { delay(100) } 
           println("Observable2 Item Received $it") 
        } 
    } 

In this program, we used two Observable; we used delay inside their subscription to simulate long running tasks.

The following output displays the expected result. The Observers run one after another:

The total execution time of this program would be around 3,100 milliseconds (as the delay is performed before printing), while the thread pool was sitting idle in between. Using scheduler, this time can be significantly reduced. Let's get it done:

    fun main(args: Array<String>) { 
      Observable.range(1, 10) 
       .subscribeOn(Schedulers.computation())//(1) 
       .subscribe { 
          runBlocking { delay(200) } 
          println("Observable1 Item Received $it") 
        } 
 
       Observable.range(21, 10) 
         .subscribeOn(Schedulers.computation())//(2) 
         .subscribe { 
            runBlocking { delay(100) } 
            println("Observable2 Item Received $it") 
          } 
       runBlocking { delay(2100) }//(3) 
    }

This program contains three new lines as compared to the previous one. On comment (1) and (2), subscribeOn(Schedulers.computation()), and runBlocking { delay(2100) } on comment (3). We will inspect the significance of those lines after taking a look at the output:

As the output shows, Observable in this example is emitted concurrently. The line of the subscribeOn(Schedulers.computation()) code enabled both downstreams to subscribe to the Observable in a different (background) thread, which influenced concurrency. You should already be used to it with using it runBlocking { delay(2100) } on comment (3); we use it to keep the program alive. As all the operations are being performed in different threads, we need to block the main thread to keep the program alive. However, notice the time duration of the delay we passed; it's only 2,100 milliseconds, and the output confirms both the subscriptions processed all the emissions. So, it's clear, we saved 1,000 milliseconds right away.

Let's now continue discussions on different types of schedulers available—we will then dive into different ways to use them.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.247.181