Moving the blocking I/O to a dedicated thread

The second evolution consists of making the S3 upload operation run in a dedicated thread. This upload operation is based on a blocking API, so it should be executed in a dedicated thread to avoid blocking the event loop. In RxPY there is no scheduler to execute some operators on a dedicated thread. However, this is not really needed because the same result is possible by creating a thread pool containing only one thread. So another scheduler is created for the S3 upload operations, as detailed in the following example:

s3_scheduler = ThreadPoolScheduler(max_workers=1)

For this scheduling, the observe_on operator can be used directly to move the execution context from the encoder thread pool to the s3 dedicated thread, as can be seen in the following example:

    store_requests = (
sources.encoder.response
.observe_on(s3_scheduler)
.map(lambda i: s3.UploadObject(
key=i.key + '.flac',
data=i.data,
id=i.id,
))
)

Once the upload is complete, the HTTP response item must be scheduled on the AsyncIO scheduler so that it can be processed on the event loop. Due to a limitation present in RxPY 1.6, this requires some evolutions on the S3 driver: the implementation of the AsyncIO scheduler does not allow us to schedule an item from a thread scheduler to the AsyncIO scheduler. This limitation may be fixed in future versions of RxPY. Should this work, it would be implemented in the following way. First, an AsyncIO scheduler is needed:

from rx.concurrency import AsyncIOScheduler

aio_scheduler = AsyncIOScheduler()

Then, this scheduler can be used before sending the HTTP response, as detailed in the following example:

    http_response = (
sources.s3.response
.observe_on(aio_scheduler)
.map(lambda i: httpd.Response(
data='ok'.encode('utf-8'),
context=i.id,
))
)

Unfortunately, this does not work with RxPY 1.6, and the items are lost at this point because they cannot be scheduled. So, instead, the S3 driver has to schedule the emission of its items directly on the AsyncIO event loop. For this, it first needs a reference to the event loop, as can be seen in the following example:

def make_driver(loop=None):
if loop is None:
loop = asyncio.get_event_loop()

The event loop can be provided as a parameter, otherwise the default event loop is used. Then, each emission of items must be done from this event loop. There is a dedicated method for the event loop to do this: call_soon_threadsafe. This method schedules the execution of a function on the event loop, even when being called from a thread other than the event loop. So, the on_next callback of the S3 driver can now wrap the emission with the following method:

elif type(item) is UploadObject:
data = BytesIO(item.data)
client.upload_fileobj(data, bucket, item.key)
loop.call_soon_threadsafe(observer.on_next, UploadReponse(
key=item.key,
id=item.id))

else:
loop.call_soon_threadsafe(observer.on_error, "unknown item: {}".format(type(item)))

As well as the subscription forwarding callbacks:

sink.request.subscribe(
on_next=on_next,
on_error=lambda e: loop.call_soon_threadsafe(observer.on_error, e),
on_completed=lambda: loop.call_soon_threadsafe(observer.on_completed)

With these evolutions, the application code runs on three different execution contexts: the event loop, the encoding thread pool, and the S3 upload thread. This is shown in the following figure:

Figure 7.6: Multithreaded execution contexts

Now, encoding the 10 files in parallel completes in 15 seconds instead of 30 seconds. This is a 100% improvement in performance. This is mainly due to the encoding, which now uses four cores. This can be confirmed by looking at a CPU monitor: four cores are now being used, with peaks of 100% usage.

Some other improvements are possible to leverage the asynchronous design of the application more extensively. It is possible to directly use S3's HTTPS APIs instead of using the boto3 package: this would allow us to upload the files concurrently on the AsyncIO event loop instead of serializing them. However, there is too much to describe here, and it will not introduce new ReactiveX or AsyncIO features. This could be an interesting exercise if you want to implement an existing protocol based on HTTP.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.174.23