The ThreadPool scheduler

Spawning a new thread each time an item is received is an anti-pattern in most cases. Spawning a thread and deleting it is a costly operation. So doing this for each item of an observable implies a performance hit. ThreadPoolScheduler allows us to execute each item operation on a dedicated thread, but instead of spawning a thread for each item, a pool of threads is used and recycled. These threads are created once and recycled without being deleted. This allows us to use a small number of threads without creating and deleting many of them. The prototype for this operator is the following one:

 ThreadPoolScheduler.__init__(self, max_workers=None)

The max_workers parameter allows us to configure how many workers will be spawned in the thread pool. If no value is specified, then the number of workers is set to five times the number of CPU cores. This is a setting that is adapted to blocking I/O operations where most threads will wait for an I/O operation to complete. However, when the need is to deal with CPU bound tasks, then another, smaller, value must be used. Using the most adapted value is always dependent on the application, but a good starting point for a CPU bound application is to use the number of cores plus one. This allows us to maximize the usage of each core, even when they go idle for short periods of time.

ThreadPoolScheduler can be used to schedule subscriptions, just like NewThreadScheduler:

import threading

threadpool_scheduler = ThreadPoolScheduler()
numbers = Observable.from_([1,2,3,4], scheduler=threadpool_scheduler)

subscription = numbers
.map(lambda i: i*2)
.map(lambda i: "number is: {}".format(i))
.subscribe(
on_next = lambda i: print("on_next({}) {}"
.format(threading.get_ident(), i)),
on_error = lambda e: print("on_error({}): {}"
.format(threading.get_ident(), e)),
on_completed = lambda: print("on_completed({})"
.format(threading.get_ident()))
)
print("main({})".format(threading.get_ident()))

This example gives the following result:

main(140736035734400)
on_next(123145554874368) number is: 2
on_next(123145554874368) number is: 4
on_next(123145554874368) number is: 6
on_next(123145554874368) number is: 8
on_completed(123145554874368)

With this single subscription, the result is the same as with NewThreadScheduler. The whole chain is executed on a different thread (123145554874368) from the main one (140736035734400). However, if many subscriptions are done, in this case the same threads will be used instead of spawning a new thread at each subscription.

This scheduler can also be used to execute each item's operations in a different thread of the pool. The following figure shows this behavior:

Figure 5.3: The observe_on operator with ThreadPoolScheduler

In this diagram, there are two threads in the pool. Each thread is used alternately to emit the items of the source observable. The following example shows this behavior:

import threading

threadpool_scheduler = ThreadPoolScheduler()
numbers = Observable.from_([1,2,3,4])

subscription = numbers
.map(lambda i: i*2)
.observe_on(threadpool_scheduler)
.map(lambda i: "number is: {}".format(i))
.subscribe(
on_next = lambda i: print("on_next({}) {}"
.format(threading.get_ident(), i)),
on_error = lambda e: print("on_error({}): {}"
.format(threading.get_ident(), e)),
on_completed = lambda: print("on_completed({})"
.format(threading.get_ident()))
)
print("main({})".format(threading.get_ident()))

This example gives the following result:

main(140736035734400)
on_next(123145565384704) number is: 2 on_next(123145554874368) number is: 4 on_next(123145560129536) number is: 6 on_next(123145565384704) number is: 8 on_completed(123145575895040)

In this run, four different threads are used to schedule the emission of the four items and completion. One thread is used twice, to emit items 2 and 8.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.107.25