Understanding Schedulers

You will likely not use schedulers like this in isolation as we are about to do in this section. You are more likely to use them with observeOn() and subscribeOn(). But here is how they work in isolation outside of an Rx context.

A Scheduler is RxJava's abstraction for pooling threads and scheduling tasks to be executed by them. These tasks may be executed immediately, delayed, or repeated periodically depending on which of its execution methods are called. These execution methods are scheduleDirect() and schedulePeriodicallyDirect(), which have a few overloads. Below, we use the computation Scheduler to execute an immediate task, a delayed task, and a repeated task as shown below:

import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class Launcher {

public static void main(String[] args) {

Scheduler scheduler = Schedulers.computation();

//run task now
scheduler.scheduleDirect(() -> System.out.println("Now!"));

//delay task by 1 second
scheduler.scheduleDirect(() ->
System.out.println("Delayed!"), 1, TimeUnit.SECONDS);

//repeat task every second
scheduler.schedulePeriodicallyDirect(() ->
System.out.println("Repeat!"), 0, 1, TimeUnit.SECONDS);

//keep alive for 5 seconds
sleep(5000);
}

public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Your output will likely be the following:

Now!
Repeat!
Delayed!
Repeat!
Repeat!
Repeat!
Repeat!
Repeat!

The scheduleDirect() will only execute a one-time task, and accepts optional overloads to specify a time delay. schedulePeriodicallyDirect() will repeat infinitely. Interestingly, all of these methods return a Disposable to allow cancellation of the task it is executing or waiting to execute.

These three methods will automatically pass tasks to a Worker, which is an abstraction that wraps around a single thread that sequentially does work given to it. You can actually call the Scheduler's createWorker() method to explicitly get a Worker and delegate tasks to it directly. Its schedule() and schedulePeriodically() methods operate just like Scheduler's scheduleDirect() and schedulePeriodicallyDirect() respectively (and also return disposables), but they are executed by the specified worker. When you are done with a worker, you should dispose it so it can be discarded or returned to the Scheduler. Here is an equivalent of our earlier example using a Worker:

import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class Launcher {

public static void main(String[] args) {

Scheduler scheduler = Schedulers.computation();
Scheduler.Worker worker = scheduler.createWorker();

//run task now
worker.schedule(() -> System.out.println("Now!"));

//delay task by 1 second
worker.schedule(() -> System.out.println("Delayed!"), 1,
TimeUnit.SECONDS);

//repeat task every second
worker.schedulePeriodically(() ->
System.out.println("Repeat!"), 0, 1, TimeUnit.SECONDS);

//keep alive for 5 seconds, then dispose Worker
sleep(5000);
worker.dispose();
}

public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

This is the output you may get:

Now!
Repeat!
Repeat!
Delayed!
Repeat!
Repeat!
Repeat!
Repeat!

Of course, every Scheduler is implemented differently . A Scheduler may use one thread or several threads. It may cache and reuse threads, or not reuse them at all. It may use an Android thread or a JavaFX thread (as we have seen with RxAndroid and RxJavaFX in this book). But that is essentially how schedulers work, and you can perhaps see why they are useful in implemeting RxJava operators.

 

 

 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.0.9