Chapter 6. Using Concurrency and Parallelism with Schedulers

Modern processors have multiple cores and enable many time-consuming operations to be processed faster simultaneously. The Java concurrency API (which includes threads and much more) makes it possible to do just that.

RxJava's Observable chains seem a good match for the threads. It would be great if we could subscribe to our source and do all the transforming, combining, and filtering in the background and, when everything is done, have the result to be passed to the main threads. Yes, this sounds wonderful, but RxJava is single-threaded by default. This means that, in the most cases, when the subscribe method is called on an Observable instance, the current thread blocks until everything is emitted. (This is not true for the Observable instances created by the interval or timer factory methods, for example.). This is a good thing because working with threads is not so easy. They are powerful, but they need to be synchronized with each other; for example, when one depends on the result of another.

One of the hardest things to manage in a multi-threaded environment is the shared data between the threads. One thread could read from a data source while another is modifying it, which leads to different versions of the same data being used by the different threads. If an Observable chain is constructed the right way, there is no shared state. This means that synchronization is not so complex.

In this chapter, we will talk about executing things in parallel and look at what concurrency means. Additionally, we'll learn some techniques for handling the situation when too many items are emitted by our Observable instances (a situation which is not so rare in the multi-threaded environment). The topics covered in this chapter are as follows:

  • Using Scheduler instances to achieve concurrency
  • Buffering, throttling, and debouncing with Observable instances

RxJava's schedulers

The schedulers are the RxJava's way of achieving concurrency. They are in charge of creating and managing the threads for us (internally relying on Java's threadpool facilities). We won't be dealing with Java's concurrency API and its quirks and complexities. We've been using the schedulers all along, implicitly with timers and intervals, but the time has come to master them.

Let's recall the Observable.interval factory method, which we introduced back in Chapter 3, Creating and Connecting Observables, Observers, and Subjects. As we saw before, RxJava is single-threaded by default, so in most cases, calling the subscribe method on the Observable instance will block the current thread. But that is not the case with the interval Observable instances. If we look at the JavaDoc of the Observable<Long> interval(long interval, TimeUnit unit) method, we'll see that it says that the Observable instance created by it operates on something called 'the computation Scheduler'.

In order to inspect the behavior of the interval method (as well as other things in this chapter) we will need a powerful debugging utility. That's why the first thing we'll be doing in this chapter is implementing it.

Debugging Observables and their schedulers

In the previous chapter, we've introduced the doOnNext() operator, which could be used for logging the emitted items directly from within the Observable chain. We mentioned that there are doOnError() and doOnCompleted() operators too. But there is one that combines all three of them—the doOnEach() operator. We can log everything from it because it receives all the notifications emitted, regardless of their type. We can put it halfway through the chain of operators and use it to log, say, the state there. It takes a Notification -> void function.

Here is the source of a higher order debug function returning a lambda result, which is capable of logging the emissions of an Observable instance labeled, using the passed description:

<T> Action1<Notification<? super T>> debug(
  String description, String offset
) {
  AtomicReference<String> nextOffset = new AtomicReference<String>(">");
  return (Notification<? super T> notification) -> {
    switch (notification.getKind()) {
    case OnNext:
      System.out.println(
        Thread.currentThread().getName() +
        "|" + description + ": " + offset +
        nextOffset.get() + notification.getValue()
      );
      break;
    case OnError:
      System.err.println(
        Thread.currentThread().getName() +
        "|" + description + ": " + offset +
        nextOffset.get() + " X " + notification.getThrowable()
      );
      break;
    case OnCompleted:
      System.out.println(
        Thread.currentThread().getName() +
        "|" + description + ": " + offset +
        nextOffset.get() + "|"
      );
    default:
      break;
    }
    nextOffset.getAndUpdate(p -> "-" + p);
  };
}

Depending on the passed description and offset, the returned method logs each notification. The important thing, however, is that it logs the current active thread's name before everything else. <value> marks the OnNext notifications; X, the OnError notifications; and |, the OnCompleted notifications, and the nextOffset variable is used to show the values in time.

Here is an example of using this new method:

Observable
  .range(5, 5)
  .doOnEach(debug("Test", ""))
  .subscribe();

This example will generate five sequential numbers, beginning with the number five. We pass a call to our debug(String, String) method to the doOnEach() operator to log everything after the call of the range() method. With a subscribe call without parameters, this little chain will be triggered. The output is as follows:

main|Test: >5
main|Test: ->6
main|Test: -->7
main|Test: --->8
main|Test: ---->9
main|Test: ----->|

The first thing logged is the name of the current thread (the main one), then we have the description of the Observable instance passed to the debug() method, and after that, a colon and dashes forming arrows, representing the time. Finally we have the symbol of the type of the notification—the value itself for values and | for completed.

Let's define one overload to the debug() helper method so that we don't need to pass a second parameter to it with an additional offset, if it is not needed:

<T> Action1<Notification<? super T>> debug(String description) {
  return debug(description, "");
}

Now we are ready to debug what's happening with the Observable instances, created by the interval method!

The interval Observable and its default scheduler

Let's examine the following example:

Observable
  .take(5)
  .interval(500L, TimeUnit.MILLISECONDS)
  .doOnEach(debug("Default interval"))
  .subscribe();

This creates an interval Observable instance, emitting every half second. We use the take() method to get only the first five notifications and to complete. We'll use our debug() helper method to log the values, emitted by the Observable instance, created by the interval method and use the call to subscribe(), which will trigger the logic. The output should look like this:

RxComputationThreadPool-1|Default interval: >0
RxComputationThreadPool-1|Default interval: ->1
RxComputationThreadPool-1|Default interval: -->2
RxComputationThreadPool-1|Default interval: --->3
RxComputationThreadPool-1|Default interval: ---->4

Everything should be familiar here, except the thread that the Observable instance executes on! This thread is not the main one. It seems it is created by a RxJava-managed pool of reusable Thread instances, judging by its name (RxComputationThreadPool-1).

If you recall, the Observable.interval factory method had the following overload:

Observable<Long> interval(long, TimeUnit, Scheduler)

This means that we can specify a scheduler on which it will operate. It was mentioned previously, that the overload with only two parameters operates on the computation scheduler. So, now let's try passing another scheduler and see what's going to happen:

Observable
  .take(5)
  .interval(500L, TimeUnit.MILLISECONDS, Schedulers.immediate())
  .doOnEach(debug("Imediate interval"))
  .subscribe();

This is the same as before, but with one little difference. We pass a scheduler called immediate. The idea is to execute the work immediately on the currently running thread. The result is as follows:

main|Imediate interval: >0
main|Imediate interval: ->1
main|Imediate interval: -->2
main|Imediate interval: --->3
main|Imediate interval: ---->4

By specifying this scheduler, we made the interval Observable instance run on the current, main thread.

With the help of the schedulers, we can instruct our operators to run on a particular thread or to use a particular pool of threads.

Everything we just covered leads us to the conclusion that the schedulers spawn new threads, or reuse already spawned ones on which the operations, part of the Observable instance chain, execute. Thus, we can achieve concurrency (operators making progress at the same time) by using only them.

In order to have multi-threaded logic, we'll have to learn just these two things:

  • The types of schedulers we can chose from
  • How to use these schedulers with an arbitrary Observable chain of operations

Types of schedulers

There are several types of schedulers dedicated for certain kinds of actions. In order to learn more about them, let's take a look at the Scheduler class.

It turns out that the class is quite simple. It has only two methods, as follows:

  • long now()
  • abstract Worker createWorker()

The first one returns the current time in milliseconds, and the second creates a Worker instance. These Worker instances are used for executing actions on a single thread or event loop (depending on the implementation). Scheduling actions for execution is done using the Worker's schedule* methods. The Worker class implements the Subscription interface, so it has an unsubscribe() method. Unsubscribing the Worker unschedules all outstanding work and allows a resource cleanup.

We can use the workers to perform scheduling outside the Observable context. For every Scheduler type, we can do the following:

scheduler.createWorker().schedule(Action0);

This will schedule the passed action and execute it. In most cases, this method shouldn't be used directly for scheduling work, we just pick the right scheduler and schedule actions on it instead. In order to understand what they do, we can use the method to inspect the various types of schedulers available.

Let's define a testing method:

void schedule(Scheduler scheduler, int numberOfSubTasks, boolean onTheSameWorker) {
  List<Integer> list = new ArrayList<>(0);
  AtomicInteger current = new AtomicInteger(0);
  Random random = new Random();
  Worker worker = scheduler.createWorker();
  Action0 addWork = () -> {
    synchronized (current) {
      System.out.println("  Add : " + Thread.currentThread().getName() + " " + current.get());
      list.add(random.nextInt(current.get()));
      System.out.println("  End add : " + Thread.currentThread().getName() + " " + current.get());
    }
  };
  Action0 removeWork = () -> {
    synchronized (current) {
      if (!list.isEmpty()) {
        System.out.println("  Remove : " + Thread.currentThread().getName());
        list.remove(0);
        System.out.println("  End remove : " + Thread.currentThread().getName());
      }
    }
  };
  Action0 work = () -> {
    System.out.println(Thread.currentThread().getName());
    for (int i = 1; i <= numberOfSubTasks; i++) {
      current.set(i);
      System.out.println("Begin add!");
      if (onTheSameWorker) {
        worker.schedule(addWork);
      }
      else {
        scheduler.createWorker().schedule(addWork);
      }
      System.out.println("End add!");
    }
    while (!list.isEmpty()) {
      System.out.println("Begin remove!");
    if (onTheSameWorker) {
      worker.schedule(removeWork);
    }
    else {
      scheduler.createWorker().schedule(removeWork);
    }
    System.out.println("End remove!");
  };
  worker.schedule(work);
}

The method uses the passed Scheduler instance to do some work. There is an option to specify whether it should use the same Worker instance for every task, or spawn a new one for every sub-task. Basically, the dummy work consists of filling up a list with random numbers and then removing these numbers one by one. Every add operation and remove operation are scheduled via the worker created by the passed Scheduler instance as a sub-task. And before and after every sub-task the current thread and some additional information is logged.

Tip

In a real-world scenario, once all the work has been done, we should always invoke the worker.unsubscribe() method.

Turning to the predefined Scheduler instances. They can be retrieved via a set of static methods contained in the Schedulers class. We will be using the debugging method defined previously to inspect their behavior in order to learn their differences and usefulness.

The Schedulers.immediate scheduler

The Schedulers.immediate scheduler executes work here and now. When an action is passed to its worker's schedule(Action0) method, it is just called. Let's suppose we run our test method with it, like this:

schedule(Schedulers.immediate(), 2, false);
schedule(Schedulers.immediate(), 2, true);

In both the cases, the result will look like this:

main
Begin add!
  Add : main 1
  End add : main 1
End add!
Begin add!
  Add : main 2
  End add : main 2
End add!
Begin remove!
  Remove : main
  End remove : main
End remove!
Begin remove!
  Remove : main
  End remove : main
End remove!

In other words, everything is executed on the caller thread—the main one and nothing is in parallel.

This scheduler can be used to execute methods, such as interval() and timer(), in the foreground.

The Schedulers.trampoline scheduler

The scheduler, retrieved by the Schedulers.trampoline method enqueues sub-tasks on the current thread. The enqueued work is executed after the work currently in progress completes. Say we were to run this:

schedule(Schedulers.trampoline(), 2, false);
schedule(Schedulers.trampoline(), 2, true);

In the first case, the result will be the same as with the immediate scheduler, because all the tasks are executed in their own Worker instances and, therefore, there is only one task to be enqueued for execution in every worker. But when we use the same Worker instance for scheduling every sub-task, we get this:

main
Begin add!
End add!
Begin add!
End add!
  Add : main 2
  End add : main 2
  Add : main 2
  End add : main 2

In other words, it will first execute the entire main action and after that, the sub-tasks; thus, the List instance will be filled in (the sub-tasks were enqueued) but never emptied. That's because, while executing the main task, the List instance was still empty and the while loop was not triggered.

Note

The trampoline scheduler is useful for avoiding a StackOverflowError exception while running many tasks recursively. For example, let's assume a task completes and then calls itself to perform some new work. In the case of a single-threaded environment, this would lead to stack overflow due to the recursion; however, if we use the trampoline scheduler, it will serialize all scheduled activities and the stack depth will remain normal. However, the trampoline scheduler is usually slower than the immediate one. So, using the correct one depends on the use case.

The Schedulers.newThread scheduler

This schedule creates a new Thread instance (a single-threaded ScheduledThreadPoolExecutor instance to be precise) for every new Worker instance. Additionally, each worker enqueues the actions it receives through its schedule() method, much like the trampoline scheduler does. Let's look at the following code:

schedule(Schedulers.newThread(), 2, true);

It will have the same behavior as the trampoline but it will run in a new thread:

RxNewThreadScheduler-1
Begin add!
End add!
Begin add!
End add!
  Add : RxNewThreadScheduler-1 2
  End add : RxNewThreadScheduler-1 2
  Add : RxNewThreadScheduler-1 2
  End add : RxNewThreadScheduler-1 2

Instead, if we call the testing method like this:

schedule(Schedulers.newThread(), 2, false);

This will spawn a new Thread instance for every sub-task, which will produce output similar to this:

RxNewThreadScheduler-1
Begin add!
End add!
Begin add!
  Add : RxNewThreadScheduler-2 1
  End add : RxNewThreadScheduler-2 2
End add!
Begin remove!
  Add : RxNewThreadScheduler-3 2
  End add : RxNewThreadScheduler-3 2
End remove!
Begin remove!
End remove!
Begin remove!
  Remove : RxNewThreadScheduler-5
  End remove : RxNewThreadScheduler-5
  Remove : RxNewThreadScheduler-4
  End remove : RxNewThreadScheduler-4
End remove!

By using the new thread Scheduler instance, you can execute background tasks.

Note

A very important requirement here is that its workers need to be unsubscribed to avoid leaking threads and OS resources. Note that it is expensive to create new threads each time, so in most cases, the computation and the IO Scheduler instances should be used.

The Schedulers.computation scheduler

The computation scheduler is very similar to the new thread one, but it takes into account the number of processors/cores that the machine on which it runs has, and uses a thread pool that can reuse a limited number of threads. Every new Worker instance schedules sequential actions on one of these Thread instances. If the thread is not used at the moment they are executed, and if it is active, they are enqueued to execute on it later.

If we use the same Worker instance, we'll just enqueue all the actions on its thread and the result will be the same as scheduling with one Worker instance, using the new thread Scheduler instance.

My machine has four cores. Say I call the testing method on it like this:

schedule(Schedulers.computation(), 5, false);

I'd get output similar to this:

RxComputationThreadPool-1
Begin add!
  Add : RxComputationThreadPool-2 1
  End add : RxComputationThreadPool-2 1
End add!
Begin add!
End add!
Begin add!
  Add : RxComputationThreadPool-3 3
  End add : RxComputationThreadPool-3 3
End add!
Begin add!
  Add : RxComputationThreadPool-4 4
End add!
Begin add!
  End add : RxComputationThreadPool-4 4
End add!
Begin remove!
End remove!
Begin remove!
  Add : RxComputationThreadPool-2 5
  End add : RxComputationThreadPool-2 5
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
  Remove : RxComputationThreadPool-3
End remove!
Begin remove!
  End remove : RxComputationThreadPool-3
  Remove : RxComputationThreadPool-2
End remove!
Begin remove!
  End remove : RxComputationThreadPool-2
End remove!
Begin remove!
  Remove : RxComputationThreadPool-2
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
  End remove : RxComputationThreadPool-2
End remove!
  Remove : RxComputationThreadPool-2
Begin remove!
  End remove : RxComputationThreadPool-2
End remove!
  Add : RxComputationThreadPool-1 5
  End add : RxComputationThreadPool-1 5
  Remove : RxComputationThreadPool-1
  End remove : RxComputationThreadPool-1

Everything is executed using only four Thread instances from a pool (note that there is a way to limit the number of Thread instances to be less than the available processor count).

The computation Scheduler instance is your real choice for doing background work—computations or processing thus its name. You can use it for everything that should run in the background and is not an IO related or blocking operation.

The Schedulers.io scheduler

The Input-Output (IO) scheduler uses a ScheduledExecutorService instance to retrieve the threads from a thread pool for its workers. Unused threads are cached and reused on demand. It can spawn an arbitrary number of threads if it is necessary.

Again, if we run our example with only one Worker instance, the actions will be enqueued on its thread, and it will behave like the computation and new thread schedulers.

Say we run it with multiple Worker instances, like this:

schedule(Schedulers.io(), 2, false);

It would produce Thread instances on demand from its pool. The result looks like this:

RxCachedThreadScheduler-1
Begin add!
End add!
Begin add!
  Add : RxCachedThreadScheduler-2 2
  End add : RxCachedThreadScheduler-2 2
End add!
Begin remove!
  Add : RxCachedThreadScheduler-3 2
  End add : RxCachedThreadScheduler-3 2
End remove!
Begin remove!
  Remove : RxCachedThreadScheduler-4
  End remove : RxCachedThreadScheduler-4
End remove!
Begin remove!
End remove!
Begin remove!
  Remove : RxCachedThreadScheduler-6
  End remove : RxCachedThreadScheduler-6
End remove!

The IO scheduler is reserved for blocking IO operations. Use it for requests to servers, reading from files and sockets, and other similar blocking tasks. Note that its thread pool is unbounded; if its workers are not unsubscribed, the pool will grow indefinitely.

The Schedulers.from(Executor) method

This can be used to create a custom Scheduler instance. If none of the predefined schedulers work for you, use this method, passing it to a java.util.concurrent.Executor instance, to implement the behavior you need.

Now that we've learned about how and when the predefined Scheduler instances should be used, is time to see how to integrate them with our Observable sequence.

Combining Observables and schedulers

In order to execute our observable logic on other threads, we can use the schedulers. There are two special operators, which receive Scheduler as a parameter and produce Observable instances, capable of performing operations on Thread instances different from the current one.

The Observable<T> subscribeOn(Scheduler) method

The subscribeOn() method creates an Observable instance, whose subscribe method causes the subscription to occur on a thread retrieved from the passed scheduler. For example, we have this:

Observable<Integer> range = Observable
  .range(20, 4)
  .doOnEach(debug("Source"));
range.subscribe();

System.out.println("Hey!");

We'll get this output:

main|Source: >20
main|Source: ->21
main|Source: -->22
main|Source: --->23
main|Source: -------->|
Hey!

This is normal; calling the subscribe method executes the observable logic on the main thread, and only after all this is done, we see 'Hey!'.

Let's modify the code to look like this:

CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
  .range(20, 4)
  .doOnEach(debug("Source"))
  .subscribeOn(Schedulers.computation())
  .finallyDo(() -> latch.countDown());
range.subscribe();
System.out.println("Hey!");
latch.await();

The output changes to the following:

Hey!
RxComputationThreadPool-1|Source: >20
RxComputationThreadPool-1|Source: ->21
RxComputationThreadPool-1|Source: -->22
RxComputationThreadPool-1|Source: --->23
RxComputationThreadPool-1|Source:--------->|

This means that the caller thread doesn't block printing 'Hey!' first or in between the the numbers, and all the Observable instance observable logic is executed on a computation thread. This way, you can use every scheduler you like to decide where to execute the work.

Here we need to mention something important about the subscribeOn() method. If you call it multiple times throughout the chain like this:

CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
  .range(20, 3)
  .doOnEach(debug("Source"))
  .subscribeOn(Schedulers.computation());
Observable<Character> chars = range
  .map(n -> n + 48)
  .map(n -> Character.toChars(n))
  .subscribeOn(Schedulers.io())
  .map(c -> c[0])
  .subscribeOn(Schedulers.newThread())
  .doOnEach(debug("Chars ", "    "))
  .finallyDo(() -> latch.countDown());
chars.subscribe();
latch.await();

The call to it that is the closest to the beginning of the chain matters. Here we subscribe on the computation scheduler first, then on the IO scheduler, and then on the new thread scheduler, but our code will be executed on the computation scheduler because this is specified first in the chain.

RxComputationThreadPool-1|Source: >20
RxComputationThreadPool-1|Chars :     >D
RxComputationThreadPool-1|Source: ->21
RxComputationThreadPool-1|Chars :     ->E
RxComputationThreadPool-1|Source: -->22
RxComputationThreadPool-1|Chars :     -->F
RxComputationThreadPool-1|Source: --->|
RxComputationThreadPool-1|Chars :     --->|

In conclusion, don't specify a scheduler in methods producing Observable instances; leave this choice to the callers of your methods. Alternatively, make your methods receive a Scheduler instance as a parameter; like the Observable.interval method, for example.

Note

The subscribeOn() operator is usable with Observable instances that block the caller thread when one subscribes to them. Using the subscribeOn() method with such sources lets the caller thread progress concurrently with the Observable instance logic.

And what about the other operator, which helps us doing work on other threads?

The Observable<T> observeOn(Scheduler) operator

The observeOn() operator is similar to the subscribeOn() operator, but instead of executing the entire chain on the passed Scheduler instances, it executes the part of the chain from its place within it, onwards. The easiest way to understand this is through an example. Let's use the previous one, after slightly modifying it:

CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
  .range(20, 3)
  .doOnEach(debug("Source"));
Observable<Character> chars = range
  .map(n -> n + 48)
  .doOnEach(debug("+48 ", "    "))
  .map(n -> Character.toChars(n))
  .map(c -> c[0])
  .observeOn(Schedulers.computation())
  .doOnEach(debug("Chars ", "    "))
  .finallyDo(() -> latch.countDown());
chars.subscribe();
System.out.println("Hey!");
latch.await();

Here, we tell the Observable chain to execute on the main thread after subscribing until it reaches the observeOn() operator. At this point, it is moved on the computation scheduler. The output of this is something similar to the following:

main|Source: >20
main|+48 :     >68
main|Source: ->21
main|+48 :     ->69
main|Source: -->22
main|+48 :     -->70
RxComputationThreadPool-3|Chars :     >D
RxComputationThreadPool-3|Chars :     ->E
RxComputationThreadPool-3|Chars :     -->F
main|Source: --->|
main|+48 :    --->|
Hey!
RxComputationThreadPool-3|Chars :    --->|

As we can see, the part of the chain before the call to the operator blocks the main thread, preventing printing Hey!. However, after all the notifications pass through the observeOn() operator, 'Hey!' is printed and the execution continues on the computation thread.

If we move the observeOn() operator up the Observable chain, a greater part of the logic will be executed using the computation scheduler.

Of course, the observeOn() operator can be used together with the subscribeOn() operator. That way, part of the chain could be executed on one thread and the rest of it on another (in most cases). This is especially useful if you code a client-side application because, normally, these applications run on one event enqueueing thread. You can read from files/servers using the IO scheduler with subscribeOn()/observeOn() operator and then observe the result on the event thread.

Tip

There is an Android module for RxJava that is not covered by this book, but it is getting quite a lot of attention. You can read more about it here: https://github.com/ReactiveX/RxJava/wiki/The-RxJava-Android-Module.

If you are an Android developer don't miss it!

There are similar modules for Swing and JavaFx as well.

Let's look at an example using both the subscribeOn() and observeOn() operators:

CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
  .range(20, 3)
  .subscribeOn(Schedulers.newThread())
  .doOnEach(debug("Source"));
Observable<Character> chars = range
  .observeOn(Schedulers.io())
  .map(n -> n + 48)
  .doOnEach(debug("+48 ", "    "))
  .observeOn(Schedulers.computation())
  .map(n -> Character.toChars(n))
  .map(c -> c[0])
  .doOnEach(debug("Chars ", "    "))
  .finallyDo(() -> latch.countDown());
chars.subscribe();
latch.await();

Here, we use one call for the subsribeOn() operator at the beginning of the chain (actually, it doesn't matter where we put it, because it is a sole call to that operator) and two calls for the observeOn() operator. The result of executing this code looks like this:

RxNewThreadScheduler-1|Source: >20
RxNewThreadScheduler-1|Source: ->21
RxNewThreadScheduler-1|Source: -->22
RxNewThreadScheduler-1|Source: --->|
RxCachedThreadScheduler-1|+48 :     >68
RxCachedThreadScheduler-1|+48 :     ->69
RxCachedThreadScheduler-1|+48 :     -->70
RxComputationThreadPool-3|Chars :     >D
RxCachedThreadScheduler-1|+48 :     --->|
RxComputationThreadPool-3|Chars :     ->E
RxComputationThreadPool-3|Chars :     -->F
RxComputationThreadPool-3|Chars :     --->|

We can see that the chain passes through three threads. If we do this with more elements, some of the code will be executed seemingly in parallel. The conclusion is that, using the observeOn() operator, we can change the threads multiple times; using the subscribeOn() operator, we can do this one time—on subscription.

Note

The source for the preceding examples with the observeOn()/subscribeOn() operators can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/SubscribeOnAndObserveOn.java.

With these two operators, we can have Observable instances and multi-threading working together. But being concurrent doesn't really mean that we can do things in parallel. It means that our program has multiple threads, making some progress independently. True parallelism is when our program uses the CPU (cores) of the machine it runs on at their maximum and its threads run literally at the same time.

All of our examples up until now just moved the chain logic onto another threads. Although, some of the examples really did part of their operations in parallel, but a true parallelism example looks different.

Parallelism

We can achieve parallelism only by using the operators that we already know. Think about the flatMap() operator; it creates an Observable instance for each item emitted by the source. If we call the subscribeOn() operator with a Scheduler instance on these Observable instances, each one of them will be scheduled on a new Worker instance, and they'll work in parallel (if the host machine allows that). Here is an example of this:

Observable<Integer> range = Observable
  .range(20, 5)
  .flatMap(n -> Observable
    .range(n, 3)
    .subscribeOn(Schedulers.computation())
    .doOnEach(debug("Source"))
  );
range.subscribe();

The output of this code looks like this:

RxComputationThreadPool-3|Source: >23
RxComputationThreadPool-4|Source: >20
RxComputationThreadPool-2|Source: >22
RxComputationThreadPool-3|Source: ->24
RxComputationThreadPool-1|Source: >21
RxComputationThreadPool-2|Source: ->23
RxComputationThreadPool-3|Source: -->25
RxComputationThreadPool-3|Source: --->|
RxComputationThreadPool-4|Source: ->21
RxComputationThreadPool-4|Source: -->22
RxComputationThreadPool-4|Source: --->|
RxComputationThreadPool-2|Source: -->24
RxComputationThreadPool-2|Source: --->|
RxComputationThreadPool-1|Source: ->22
RxComputationThreadPool-1|Source: -->23
RxComputationThreadPool-1|Source: --->|
RxComputationThreadPool-4|Source: >24
RxComputationThreadPool-4|Source: ->25
RxComputationThreadPool-4|Source: -->26
RxComputationThreadPool-4|Source: --->|

We can see by the names of the threads that the Observable instances defined through the flatMap() operator are executed in parallel. And that's really the case—the four threads are using the four cores of my processor.

I'll provide another example, this time for parallel requests to a remote server. We'll be using the requestJson() method we defined in the previous chapter. The idea is this:

  1. We'll retrieve information about the followers of a GitHub user (for this example we'll be using my account).
  2. For every follower, we'll get the URL to its profile.
  3. We will request the profiles of the followers in parallel.
  4. We'll print the number of the followers and the number of their followers.

Let's see how this is implemented:

Observable<Map> response = CreateObservable.requestJson(
  client,
  "https://api.github.com/users/meddle0x53/followers"
); // (1)
response
  .map(followerJson -> followerJson.get("url")) // (2)
  .cast(String.class)
  .flatMap(profileUrl -> CreateObservable
    .requestJson(client, profileUrl)
    .subscribeOn(Schedulers.io()) // (3)
    .filter(res -> res.containsKey("followers"))
    .map(json ->  // (4)
      json.get("login") +  " : " +
      json.get("followers"))
  )
  .doOnNext(follower -> System.out.println(follower)) // (5)
  .count() // (6)
  .subscribe(sum -> System.out.println("meddle0x53 : " + sum));

Here's what's happening in the preceding code:

  1. First we perform a request to the followers data of my user.
  2. The request returns the followers as JSON strings, which are converted into Map objects (see the implementation of the requestJson method). From each of the JSON files, the URL to the profile of the follower it represents is read.
  3. A new request is executed for each of these URLs. The requests run in parallel on IO threads, because we use the same technique as in the previous example. It is worth mentioning that the flatMap() operator has an overload that takes a maxConcurrent integer parameter. We can limit the concurrent requests using it.
  4. After user data for a follower is fetched, the information for his/her followers is generated.
  5. This information is printed as a side effect.
  6. We count my followers using the count() operator (which is the same as the scan(0.0, (sum, element) -> sum + 1).last() call). Then we print them. The order of the printed data is not guaranteed to be the same as the order in which the followers were traversed.

That's all about concurrency and parallelism. Everything is pretty simple, but powerful. There are a few rules (such as using the Subscribers.io instance for blocking operations, using the computation one for background tasks, and so on) that you must follow to ensure nothing goes wrong, even with multi-threaded observable chains of actions.

It is very possible using this parallelism technique to flood the Observable instance chain with data, and that's a problem. That's why we'll have to deal with it. Through the rest of this chapter, we'll learn how to handle too many elements coming from an upstream observable chains of actionse.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.201.217