Structuring code for parallelism

Unfortunately, RxJava doesn't have a very straightforward way to achieve parallelism in its core library. Once Observable and the Subscription are created, the operations will be executed on the threads that they were assigned to at the moment of Subscription.

That means that the same block cannot execute two pieces of code at the same time. However, there is a workaround about that--it just means that we need to keep creating new Observables as we need!

Consider this code:

Observable.range(1, 100)
.map(Sandbox::importantLongTask)
.map(Object::toString)
.subscribe(e -> log("subscribe", e));
...
public static int importantLongTask(int i) {
try {
long minMillis = 10L;
long maxMillis = 1000L;
log("Working on " + i);
final long waitingTime = (long) (minMillis + (Math.random() *
maxMillis - minMillis));
Thread.sleep(waitingTime);
return i;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

Clearly, here, some of the tasks can be completed faster than the others. To take advantage of that, and the fact that we don't care in which order the result will arrive, we can restructure the flow like this:

Observable.range(1, 100)
.flatMap(i -> Observable.just(i)
.subscribeOn(Schedulers.io())
.map(Sandbox::importantLongTask)
)
.map(Object::toString)
.subscribe(e -> log("subscribe", e));

Here, we had to use .flatMap() to make the creation of Observables a part of the flow. In the end, what it does is that Observable.range(1, 100) finishes very quickly and, in the process, creates a whole new bunch (in fact, 100) of Observables.

When this code is executed, we will be able to see lines like this:

subscribe:RxCachedThreadScheduler-83:78
subscribe:RxCachedThreadScheduler-35:35

This clearly shows that item 78, which was emitted after 35, was processed faster than the latter one.

Finally, it is quite rare to be in need of this on an Android device, as it is a very quick way to consume the resources of the system or network. Remember that the IO Scheduler is unbounded in the amount of workers that it can contain, so if there were to be a million items instead of a hundred, a few thousand workers would be created and that would consume large amounts of memory.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.123.155