Modern processors have multiple cores and enable many time-consuming operations to be processed faster simultaneously. The Java concurrency API (which includes threads and much more) makes it possible to do just that.
RxJava's Observable
chains seem a good match for the threads. It would be great if we could subscribe to our source and do all the transforming, combining, and filtering in the background and, when everything is done, have the result to be passed to the main threads. Yes, this sounds wonderful, but RxJava is single-threaded by default. This means that, in the most cases, when the subscribe
method is called on an Observable
instance, the current thread blocks until everything is emitted. (This is not true for the Observable
instances created by the interval
or timer
factory methods, for example.). This is a good thing because working with threads is not so easy. They are powerful, but they need to be synchronized with each other; for example, when one depends on the result of another.
One of the hardest things to manage in a multi-threaded environment is the shared data between the threads. One thread could read from a data source while another is modifying it, which leads to different versions of the same data being used by the different threads. If an Observable
chain is constructed the right way, there is no shared state. This means that synchronization is not so complex.
In this chapter, we will talk about executing things in parallel and look at what concurrency means. Additionally, we'll learn some techniques for handling the situation when too many items are emitted by our Observable
instances (a situation which is not so rare in the multi-threaded environment). The topics covered in this chapter are as follows:
Scheduler
instances to achieve concurrencyObservable
instancesThe schedulers are the RxJava's way of achieving concurrency. They are in charge of creating and managing the threads for us (internally relying on Java's threadpool facilities). We won't be dealing with Java's concurrency API and its quirks and complexities. We've been using the schedulers all along, implicitly with timers and intervals, but the time has come to master them.
Let's recall the Observable.interval
factory method, which we introduced back in Chapter 3, Creating and Connecting Observables, Observers, and Subjects. As we saw before, RxJava is single-threaded by default, so in most cases, calling the subscribe
method on the Observable
instance will block the current thread. But that is not the case with the interval Observable
instances. If we look at the JavaDoc of the Observable<Long> interval(long interval, TimeUnit unit)
method, we'll see that it says that the Observable
instance created by it operates on something called 'the computation Scheduler'.
In order to inspect the behavior of the interval
method (as well as other things in this chapter) we will need a powerful debugging utility. That's why the first thing we'll be doing in this chapter is implementing it.
In the previous chapter, we've introduced the doOnNext()
operator, which could be used for logging the emitted items directly from within the Observable
chain. We mentioned that there are doOnError()
and doOnCompleted()
operators too. But there is one that combines all three of them—the doOnEach()
operator. We can log everything from it because it receives all the notifications emitted, regardless of their type. We can put it halfway through the chain of operators and use it to log, say, the state there. It takes a Notification -> void
function.
Here is the source of a higher order debug function returning a lambda
result, which is capable of logging the emissions of an Observable
instance labeled, using the passed description:
<T> Action1<Notification<? super T>> debug( String description, String offset ) { AtomicReference<String> nextOffset = new AtomicReference<String>(">"); return (Notification<? super T> notification) -> { switch (notification.getKind()) { case OnNext: System.out.println( Thread.currentThread().getName() + "|" + description + ": " + offset + nextOffset.get() + notification.getValue() ); break; case OnError: System.err.println( Thread.currentThread().getName() + "|" + description + ": " + offset + nextOffset.get() + " X " + notification.getThrowable() ); break; case OnCompleted: System.out.println( Thread.currentThread().getName() + "|" + description + ": " + offset + nextOffset.get() + "|" ); default: break; } nextOffset.getAndUpdate(p -> "-" + p); }; }
Depending on the passed description and offset, the returned method logs each notification. The important thing, however, is that it logs the current active thread's name before everything else. <value>
marks the OnNext notifications; X
, the OnError notifications; and |
, the OnCompleted notifications, and the nextOffset
variable is used to show the values in time.
Here is an example of using this new method:
Observable
.range(5, 5)
.doOnEach(debug("Test", ""))
.subscribe();
This example will generate five sequential numbers, beginning with the number five. We pass a call to our debug(String, String)
method to the doOnEach()
operator to log everything after the call of the range()
method. With a subscribe call without parameters, this little chain will be triggered. The output is as follows:
main|Test: >5 main|Test: ->6 main|Test: -->7 main|Test: --->8 main|Test: ---->9 main|Test: ----->|
The first thing logged is the name of the current thread (the main one), then we have the description of the Observable
instance passed to the debug()
method, and after that, a colon and dashes forming arrows, representing the time. Finally we have the symbol of the type of the notification—the value itself for values and |
for completed.
Let's define one overload to the debug()
helper method so that we don't need to pass a second parameter to it with an additional offset, if it is not needed:
<T> Action1<Notification<? super T>> debug(String description) {
return debug(description, "");
}
The code for the preceding methods can be viewed/downloaded at: https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/common/Helpers.java.
Now we are ready to debug what's happening with the Observable
instances, created by the interval method!
Let's examine the following example:
Observable .take(5) .interval(500L, TimeUnit.MILLISECONDS) .doOnEach(debug("Default interval")) .subscribe();
This creates an interval Observable
instance, emitting every half second. We use the take()
method to get only the first five notifications and to complete. We'll use our debug()
helper method to log the values, emitted by the Observable
instance, created by the interval method and use the call to subscribe()
, which will trigger the logic. The output should look like this:
RxComputationThreadPool-1|Default interval: >0 RxComputationThreadPool-1|Default interval: ->1 RxComputationThreadPool-1|Default interval: -->2 RxComputationThreadPool-1|Default interval: --->3 RxComputationThreadPool-1|Default interval: ---->4
Everything should be familiar here, except the thread that the Observable
instance executes on! This thread is not the main one. It seems it is created by a RxJava-managed pool of reusable Thread
instances, judging by its name (RxComputationThreadPool-1
).
If you recall, the Observable.interval
factory method had the following overload:
Observable<Long> interval(long, TimeUnit, Scheduler)
This means that we can specify a scheduler on which it will operate. It was mentioned previously, that the overload with only two parameters operates on the computation scheduler. So, now let's try passing another scheduler and see what's going to happen:
Observable
.take(5)
.interval(500L, TimeUnit.MILLISECONDS, Schedulers.immediate())
.doOnEach(debug("Imediate interval"))
.subscribe();
This is the same as before, but with one little difference. We pass a scheduler called immediate. The idea is to execute the work immediately on the currently running thread. The result is as follows:
main|Imediate interval: >0 main|Imediate interval: ->1 main|Imediate interval: -->2 main|Imediate interval: --->3 main|Imediate interval: ---->4
By specifying this scheduler, we made the interval Observable
instance run on the current, main thread.
The source code for the preceding example can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/IntervalAndSchedulers.java.
With the help of the schedulers, we can instruct our operators to run on a particular thread or to use a particular pool of threads.
Everything we just covered leads us to the conclusion that the schedulers spawn new threads, or reuse already spawned ones on which the operations, part of the Observable
instance chain, execute. Thus, we can achieve concurrency (operators making progress at the same time) by using only them.
In order to have multi-threaded logic, we'll have to learn just these two things:
Observable
chain of operationsThere are several types of schedulers
dedicated for certain kinds of actions. In order to learn more about them, let's take a look at the Scheduler
class.
It turns out that the class is quite simple. It has only two methods, as follows:
long now()
abstract Worker createWorker()
The first one returns the current time in milliseconds, and the second creates a Worker
instance. These Worker
instances are used for executing actions on a single thread or event loop (depending on the implementation). Scheduling actions for execution is done using the Worker's schedule*
methods. The Worker
class implements the Subscription
interface, so it has an unsubscribe()
method. Unsubscribing the Worker
unschedules all outstanding work and allows a resource cleanup.
We can use the workers to perform scheduling outside the Observable
context. For every Scheduler
type, we can do the following:
scheduler.createWorker().schedule(Action0);
This will schedule the passed action and execute it. In most cases, this method shouldn't be used directly for scheduling work, we just pick the right scheduler and schedule actions on it instead. In order to understand what they do, we can use the method to inspect the various types of schedulers available.
Let's define a testing method:
void schedule(Scheduler scheduler, int numberOfSubTasks, boolean onTheSameWorker) { List<Integer> list = new ArrayList<>(0); AtomicInteger current = new AtomicInteger(0); Random random = new Random(); Worker worker = scheduler.createWorker(); Action0 addWork = () -> { synchronized (current) { System.out.println(" Add : " + Thread.currentThread().getName() + " " + current.get()); list.add(random.nextInt(current.get())); System.out.println(" End add : " + Thread.currentThread().getName() + " " + current.get()); } }; Action0 removeWork = () -> { synchronized (current) { if (!list.isEmpty()) { System.out.println(" Remove : " + Thread.currentThread().getName()); list.remove(0); System.out.println(" End remove : " + Thread.currentThread().getName()); } } }; Action0 work = () -> { System.out.println(Thread.currentThread().getName()); for (int i = 1; i <= numberOfSubTasks; i++) { current.set(i); System.out.println("Begin add!"); if (onTheSameWorker) { worker.schedule(addWork); } else { scheduler.createWorker().schedule(addWork); } System.out.println("End add!"); } while (!list.isEmpty()) { System.out.println("Begin remove!"); if (onTheSameWorker) { worker.schedule(removeWork); } else { scheduler.createWorker().schedule(removeWork); } System.out.println("End remove!"); }; worker.schedule(work); }
The method uses the passed Scheduler
instance to do some work. There is an option to specify whether it should use the same Worker
instance for every task, or spawn a new one for every sub-task. Basically, the dummy work consists of filling up a list with random numbers and then removing these numbers one by one. Every add operation and remove operation are scheduled via the worker created by the passed Scheduler
instance as a sub-task. And before and after every sub-task the current thread and some additional information is logged.
Turning to the predefined Scheduler
instances. They can be retrieved via a set of static methods contained in the Schedulers
class. We will be using the debugging method defined previously to inspect their behavior in order to learn their differences and usefulness.
The Schedulers.immediate
scheduler executes work here and now. When an action is passed to its worker's schedule(Action0)
method, it is just called. Let's suppose we run our test method with it, like this:
schedule(Schedulers.immediate(), 2, false); schedule(Schedulers.immediate(), 2, true);
In both the cases, the result will look like this:
main Begin add! Add : main 1 End add : main 1 End add! Begin add! Add : main 2 End add : main 2 End add! Begin remove! Remove : main End remove : main End remove! Begin remove! Remove : main End remove : main End remove!
In other words, everything is executed on the caller thread—the main one and nothing is in parallel.
This scheduler can be used to execute methods, such as interval()
and timer()
, in the foreground.
The scheduler, retrieved by the Schedulers.trampoline
method enqueues sub-tasks on the current thread
. The enqueued work is executed after the work currently in progress completes. Say we were to run this:
schedule(Schedulers.trampoline(), 2, false); schedule(Schedulers.trampoline(), 2, true);
In the first case, the result will be the same as with the immediate scheduler, because all the tasks are executed in their own Worker
instances and, therefore, there is only one task to be enqueued for execution in every worker. But when we use the same Worker
instance for scheduling every sub-task, we get this:
main Begin add! End add! Begin add! End add! Add : main 2 End add : main 2 Add : main 2 End add : main 2
In other words, it will first execute the entire main action and after that, the sub-tasks; thus, the List
instance will be filled in (the sub-tasks were enqueued) but never emptied. That's because, while executing the main task, the List
instance was still empty and the while
loop was not triggered.
The trampoline scheduler is useful for avoiding a StackOverflowError
exception while running many tasks recursively. For example, let's assume a task completes and then calls itself to perform some new work. In the case of a single-threaded environment, this would lead to stack overflow due to the recursion; however, if we use the trampoline scheduler, it will serialize all scheduled activities and the stack depth will remain normal. However, the trampoline scheduler is usually slower than the immediate one. So, using the correct one depends on the use case.
This schedule creates a new Thread
instance (a single-threaded ScheduledThreadPoolExecutor
instance to be precise) for every new Worker
instance. Additionally, each worker enqueues the actions it receives through its schedule()
method, much like the trampoline scheduler does. Let's look at the following code:
schedule(Schedulers.newThread(), 2, true);
It will have the same behavior as the trampoline but it will run in a new thread:
RxNewThreadScheduler-1 Begin add! End add! Begin add! End add! Add : RxNewThreadScheduler-1 2 End add : RxNewThreadScheduler-1 2 Add : RxNewThreadScheduler-1 2 End add : RxNewThreadScheduler-1 2
Instead, if we call the testing method like this:
schedule(Schedulers.newThread(), 2, false);
This will spawn a new Thread
instance for every sub-task, which will produce output similar to this:
RxNewThreadScheduler-1 Begin add! End add! Begin add! Add : RxNewThreadScheduler-2 1 End add : RxNewThreadScheduler-2 2 End add! Begin remove! Add : RxNewThreadScheduler-3 2 End add : RxNewThreadScheduler-3 2 End remove! Begin remove! End remove! Begin remove! Remove : RxNewThreadScheduler-5 End remove : RxNewThreadScheduler-5 Remove : RxNewThreadScheduler-4 End remove : RxNewThreadScheduler-4 End remove!
By using the new thread Scheduler
instance, you can execute background tasks.
The computation scheduler is very similar to the new thread one, but it takes into account the number of processors/cores that the machine on which it runs has, and uses a thread pool that can reuse a limited number of threads. Every new Worker
instance schedules sequential actions on one of these Thread
instances. If the thread is not used at the moment they are executed, and if it is active, they are enqueued to execute on it later.
If we use the same Worker
instance, we'll just enqueue all the actions on its thread and the result will be the same as scheduling with one Worker
instance, using the new thread Scheduler
instance.
My machine has four cores. Say I call the testing method on it like this:
schedule(Schedulers.computation(), 5, false);
I'd get output similar to this:
RxComputationThreadPool-1 Begin add! Add : RxComputationThreadPool-2 1 End add : RxComputationThreadPool-2 1 End add! Begin add! End add! Begin add! Add : RxComputationThreadPool-3 3 End add : RxComputationThreadPool-3 3 End add! Begin add! Add : RxComputationThreadPool-4 4 End add! Begin add! End add : RxComputationThreadPool-4 4 End add! Begin remove! End remove! Begin remove! Add : RxComputationThreadPool-2 5 End add : RxComputationThreadPool-2 5 End remove! Begin remove! End remove! Begin remove! End remove! Begin remove! End remove! Begin remove! End remove! Begin remove! End remove! Begin remove! End remove! Begin remove! Remove : RxComputationThreadPool-3 End remove! Begin remove! End remove : RxComputationThreadPool-3 Remove : RxComputationThreadPool-2 End remove! Begin remove! End remove : RxComputationThreadPool-2 End remove! Begin remove! Remove : RxComputationThreadPool-2 End remove! Begin remove! End remove! Begin remove! End remove! Begin remove! End remove! Begin remove! End remove : RxComputationThreadPool-2 End remove! Remove : RxComputationThreadPool-2 Begin remove! End remove : RxComputationThreadPool-2 End remove! Add : RxComputationThreadPool-1 5 End add : RxComputationThreadPool-1 5 Remove : RxComputationThreadPool-1 End remove : RxComputationThreadPool-1
Everything is executed using only four Thread
instances from a pool (note that there is a way to limit the number of Thread
instances to be less than the available processor count).
The computation Scheduler
instance is your real choice for doing background work—computations or processing thus its name. You can use it for everything that should run in the background and is not an IO related or blocking operation.
The Input-Output (IO) scheduler uses a ScheduledExecutorService
instance to retrieve the threads from a thread pool for its workers. Unused threads are cached and reused on demand. It can spawn an arbitrary number of threads if it is necessary.
Again, if we run our example with only one Worker
instance, the actions will be enqueued on its thread, and it will behave like the computation and new thread schedulers.
Say we run it with multiple Worker
instances, like this:
schedule(Schedulers.io(), 2, false);
It would produce Thread
instances on demand from its pool. The result looks like this:
RxCachedThreadScheduler-1 Begin add! End add! Begin add! Add : RxCachedThreadScheduler-2 2 End add : RxCachedThreadScheduler-2 2 End add! Begin remove! Add : RxCachedThreadScheduler-3 2 End add : RxCachedThreadScheduler-3 2 End remove! Begin remove! Remove : RxCachedThreadScheduler-4 End remove : RxCachedThreadScheduler-4 End remove! Begin remove! End remove! Begin remove! Remove : RxCachedThreadScheduler-6 End remove : RxCachedThreadScheduler-6 End remove!
The IO scheduler is reserved for blocking IO operations. Use it for requests to servers, reading from files and sockets, and other similar blocking tasks. Note that its thread pool is unbounded; if its workers are not unsubscribed, the pool will grow indefinitely.
The source code for all the preceding code is located at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/SchedulersTypes.java.
This can be used to create a custom Scheduler
instance. If none of the predefined schedulers work for you, use this method, passing it to a java.util.concurrent.Executor
instance, to implement the behavior you need.
Now that we've learned about how and when the predefined Scheduler
instances should be used, is time to see how to integrate them with our Observable
sequence.
In order to execute our observable logic on other threads, we can use the schedulers. There are two special operators, which receive Scheduler
as a parameter and produce Observable
instances, capable of performing operations on Thread
instances different from the current one.
The subscribeOn()
method creates an Observable
instance, whose subscribe
method causes the subscription to occur on a thread retrieved from the passed scheduler. For example, we have this:
Observable<Integer> range = Observable .range(20, 4) .doOnEach(debug("Source")); range.subscribe(); System.out.println("Hey!");
We'll get this output:
main|Source: >20 main|Source: ->21 main|Source: -->22 main|Source: --->23 main|Source: -------->| Hey!
This is normal; calling the subscribe
method executes the observable logic on the main thread, and only after all this is done, we see 'Hey!'
.
Let's modify the code to look like this:
CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
.range(20, 4)
.doOnEach(debug("Source"))
.subscribeOn(Schedulers.computation())
.finallyDo(() -> latch.countDown());
range.subscribe();
System.out.println("Hey!");
latch.await();
The output changes to the following:
Hey! RxComputationThreadPool-1|Source: >20 RxComputationThreadPool-1|Source: ->21 RxComputationThreadPool-1|Source: -->22 RxComputationThreadPool-1|Source: --->23 RxComputationThreadPool-1|Source:--------->|
This means that the caller thread doesn't block printing 'Hey!'
first or in between the the numbers, and all the Observable
instance observable logic is executed on a computation thread. This way, you can use every scheduler you like to decide where to execute the work.
Here we need to mention something important about the subscribeOn()
method. If you call it multiple times throughout the chain like this:
CountDownLatch latch = new CountDownLatch(1); Observable<Integer> range = Observable .range(20, 3) .doOnEach(debug("Source")) .subscribeOn(Schedulers.computation()); Observable<Character> chars = range .map(n -> n + 48) .map(n -> Character.toChars(n)) .subscribeOn(Schedulers.io()) .map(c -> c[0]) .subscribeOn(Schedulers.newThread()) .doOnEach(debug("Chars ", " ")) .finallyDo(() -> latch.countDown()); chars.subscribe(); latch.await();
The call to it that is the closest to the beginning of the chain matters. Here we subscribe on the computation scheduler first, then on the IO scheduler, and then on the new thread scheduler, but our code will be executed on the computation scheduler because this is specified first in the chain.
RxComputationThreadPool-1|Source: >20 RxComputationThreadPool-1|Chars : >D RxComputationThreadPool-1|Source: ->21 RxComputationThreadPool-1|Chars : ->E RxComputationThreadPool-1|Source: -->22 RxComputationThreadPool-1|Chars : -->F RxComputationThreadPool-1|Source: --->| RxComputationThreadPool-1|Chars : --->|
In conclusion, don't specify a scheduler in methods producing Observable
instances; leave this choice to the callers of your methods. Alternatively, make your methods receive a Scheduler
instance as a parameter; like the Observable.interval
method, for example.
And what about the other operator, which helps us doing work on other threads?
The observeOn()
operator is similar to the subscribeOn()
operator, but instead of executing the entire chain on the passed Scheduler
instances, it executes the part of the chain from its place within it, onwards. The easiest way to understand this is through an example. Let's use the previous one, after slightly modifying it:
CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
.range(20, 3)
.doOnEach(debug("Source"));
Observable<Character> chars = range
.map(n -> n + 48)
.doOnEach(debug("+48 ", " "))
.map(n -> Character.toChars(n))
.map(c -> c[0])
.observeOn(Schedulers.computation())
.doOnEach(debug("Chars ", " "))
.finallyDo(() -> latch.countDown());
chars.subscribe();
System.out.println("Hey!");
latch.await();
Here, we tell the Observable
chain to execute on the main thread after subscribing until it reaches the observeOn()
operator. At this point, it is moved on the computation scheduler. The output of this is something similar to the following:
main|Source: >20 main|+48 : >68 main|Source: ->21 main|+48 : ->69 main|Source: -->22 main|+48 : -->70 RxComputationThreadPool-3|Chars : >D RxComputationThreadPool-3|Chars : ->E RxComputationThreadPool-3|Chars : -->F main|Source: --->| main|+48 : --->| Hey! RxComputationThreadPool-3|Chars : --->|
As we can see, the part of the chain before the call to the operator blocks the main thread, preventing printing Hey!
. However, after all the notifications pass through the observeOn()
operator, 'Hey!'
is printed and the execution continues on the computation thread.
If we move the observeOn()
operator up the Observable
chain, a greater part of the logic will be executed using the computation scheduler.
Of course, the observeOn()
operator can be used together with the subscribeOn()
operator. That way, part of the chain could be executed on one thread and the rest of it on another (in most cases). This is especially useful if you code a client-side application because, normally, these applications run on one event enqueueing thread. You can read from files/servers using the IO scheduler with subscribeOn()
/observeOn()
operator and then observe the result on the event thread.
There is an Android module for RxJava that is not covered by this book, but it is getting quite a lot of attention. You can read more about it here: https://github.com/ReactiveX/RxJava/wiki/The-RxJava-Android-Module.
If you are an Android developer don't miss it!
Let's look at an example using both the subscribeOn()
and observeOn()
operators:
CountDownLatch latch = new CountDownLatch(1); Observable<Integer> range = Observable .range(20, 3) .subscribeOn(Schedulers.newThread()) .doOnEach(debug("Source")); Observable<Character> chars = range .observeOn(Schedulers.io()) .map(n -> n + 48) .doOnEach(debug("+48 ", " ")) .observeOn(Schedulers.computation()) .map(n -> Character.toChars(n)) .map(c -> c[0]) .doOnEach(debug("Chars ", " ")) .finallyDo(() -> latch.countDown()); chars.subscribe(); latch.await();
Here, we use one call for the subsribeOn()
operator at the beginning of the chain (actually, it doesn't matter where we put it, because it is a sole call to that operator) and two calls for the observeOn()
operator. The result of executing this code looks like this:
RxNewThreadScheduler-1|Source: >20 RxNewThreadScheduler-1|Source: ->21 RxNewThreadScheduler-1|Source: -->22 RxNewThreadScheduler-1|Source: --->| RxCachedThreadScheduler-1|+48 : >68 RxCachedThreadScheduler-1|+48 : ->69 RxCachedThreadScheduler-1|+48 : -->70 RxComputationThreadPool-3|Chars : >D RxCachedThreadScheduler-1|+48 : --->| RxComputationThreadPool-3|Chars : ->E RxComputationThreadPool-3|Chars : -->F RxComputationThreadPool-3|Chars : --->|
We can see that the chain passes through three threads. If we do this with more elements, some of the code will be executed seemingly in parallel. The conclusion is that, using the observeOn()
operator, we can change the threads multiple times; using the subscribeOn()
operator, we can do this one time—on subscription.
The source for the preceding examples with the observeOn()
/subscribeOn()
operators can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/SubscribeOnAndObserveOn.java.
With these two operators, we can have Observable
instances and multi-threading working together. But being concurrent doesn't really mean that we can do things in parallel. It means that our program has multiple threads, making some progress independently. True parallelism is when our program uses the CPU (cores) of the machine it runs on at their maximum and its threads run literally at the same time.
All of our examples up until now just moved the chain logic onto another threads. Although, some of the examples really did part of their operations in parallel, but a true parallelism example looks different.
We can achieve parallelism only by using the operators that we already know. Think about the flatMap()
operator; it creates an Observable
instance for each item emitted by the source. If we call the subscribeOn()
operator with a Scheduler
instance on these Observable
instances, each one of them will be scheduled on a new Worker
instance, and they'll work in parallel (if the host machine allows that). Here is an example of this:
Observable<Integer> range = Observable .range(20, 5) .flatMap(n -> Observable .range(n, 3) .subscribeOn(Schedulers.computation()) .doOnEach(debug("Source")) ); range.subscribe();
The output of this code looks like this:
RxComputationThreadPool-3|Source: >23 RxComputationThreadPool-4|Source: >20 RxComputationThreadPool-2|Source: >22 RxComputationThreadPool-3|Source: ->24 RxComputationThreadPool-1|Source: >21 RxComputationThreadPool-2|Source: ->23 RxComputationThreadPool-3|Source: -->25 RxComputationThreadPool-3|Source: --->| RxComputationThreadPool-4|Source: ->21 RxComputationThreadPool-4|Source: -->22 RxComputationThreadPool-4|Source: --->| RxComputationThreadPool-2|Source: -->24 RxComputationThreadPool-2|Source: --->| RxComputationThreadPool-1|Source: ->22 RxComputationThreadPool-1|Source: -->23 RxComputationThreadPool-1|Source: --->| RxComputationThreadPool-4|Source: >24 RxComputationThreadPool-4|Source: ->25 RxComputationThreadPool-4|Source: -->26 RxComputationThreadPool-4|Source: --->|
We can see by the names of the threads that the Observable
instances defined through the flatMap()
operator are executed in parallel. And that's really the case—the four threads are using the four cores of my processor.
I'll provide another example, this time for parallel requests to a remote server. We'll be using the requestJson()
method we defined in the previous chapter. The idea is this:
Let's see how this is implemented:
Observable<Map> response = CreateObservable.requestJson( client, "https://api.github.com/users/meddle0x53/followers" ); // (1) response .map(followerJson -> followerJson.get("url")) // (2) .cast(String.class) .flatMap(profileUrl -> CreateObservable .requestJson(client, profileUrl) .subscribeOn(Schedulers.io()) // (3) .filter(res -> res.containsKey("followers")) .map(json -> // (4) json.get("login") + " : " + json.get("followers")) ) .doOnNext(follower -> System.out.println(follower)) // (5) .count() // (6) .subscribe(sum -> System.out.println("meddle0x53 : " + sum));
Here's what's happening in the preceding code:
Map
objects (see the implementation of the requestJson
method). From each of the JSON files, the URL to the profile of the follower it represents is read.flatMap()
operator has an overload that takes a maxConcurrent
integer parameter. We can limit the concurrent requests using it.count()
operator (which is the same as the scan(0.0, (sum, element) -> sum + 1).last()
call). Then we print them. The order of the printed data is not guaranteed to be the same as the order in which the followers were traversed.The source code for the preceding example can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/ParallelRequestsExample.java.
That's all about concurrency and parallelism. Everything is pretty simple, but powerful. There are a few rules (such as using the Subscribers.io
instance for blocking operations, using the computation one for background tasks, and so on) that you must follow to ensure nothing goes wrong, even with multi-threaded observable chains of actions.
It is very possible using this parallelism technique to flood the Observable
instance chain with data, and that's a problem. That's why we'll have to deal with it. Through the rest of this chapter, we'll learn how to handle too many elements coming from an upstream observable chains of actionse.
3.135.201.217