Using thread pools for CPU-bound operations

There are two possible ways to deal with CPU-bound operations:

  • Split them into multiple processes
  • Split them into multiple threads (provided that several execution engines are available on the CPU)

For now, the multithreading solution will be investigated. More details on how to solve it with multiprocessing will be provided in Chapter 10, Testing and Debugging.

As explained in Chapter 5Concurrency and Parallelism in RxPY, two operators allow us to deal with schedulers and execution contexts: observe_on and subscribe_on. In the case of this application, ThreadPoolScheduler can be used. Since the encoding requests use a lot of CPU, and they can run in parallel, a thread pool can be used to execute them concurrently. But, before this can be done, another change is needed in the encode driver. The encoding function uses temporary files to work with the sox package. This is detailed in the following example:

def mp3_to_flac(data, dest_path, name):
tmp_filename = os.path.join('/tmp/transcode-tmp.mp3')
dest_filename = os.path.join(dest_path, name + '.flac')
...

The two filename variables are set to the same values for all files being encoded. So, if several files are encoded at the same time, then there will be simultaneous access to these temporary file. This can be fixed in several ways. One way is to use the Python tempfile.mkstemp function to create unique files each time. Another one is to add the thread identifier in the name of the file. This is shown in the following example:

import threading

def mp3_to_flac(data, dest_path, name):
tid = threading.get_ident()
tmp_filename = os.path.join('/tmp/transcode-{}.mp3'.format(tid))
tmp2_filename = os.path.join('/tmp/transcode-{}.flac'.format(tid))

With this simple change, the encode driver can now be used in a multithreaded context. Then the encode requests must be scheduled on ThreadPoolScheduler to execute them in parallel. First a dedicated scheduler must be created, as can be seen in the following example:

from rx.concurrency import ThreadPoolScheduler

encode_scheduler = ThreadPoolScheduler(max_workers=4)

Here the thread pool is set to 4 workers. So, four encodings can be done in parallel. Then, the following scheduler can be used:

    encode_request = (
sources.httpd.route
.filter(lambda i: i.id == 'flac_transcode')
.flat_map(lambda i: i.request)
.flat_map(lambda i: Observable.just(i, encode_scheduler))
.map(lambda i: encoder.EncodeMp3(
id=i.context,
data=i.data,
key=i.match_info['key']))
)

There is a trick here: directly using the observe_on operator does not work. The following items are only scheduled when the previous items are encoded. So that the scheduler can process items in parallel, a new subscription must occur for each item. This is why the flat_map operator is used, combined with the just operator associated to the scheduler. From that point, each map operation occurs in parallel on one of the four threads of the thread pool.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.252.204