Converting blocking code into non-blocking code

While asyncio supports connecting to resources in an asynchronous way, it is required to use blocking calls in certain cases. This happens, for example, when third-party APIs exclusively expose blocking calls (for example, many database libraries), but also when executing long-running computations. In this subsection, we will learn how to deal with blocking APIs and make them compatible with asyncio.

An effective strategy for dealing with blocking code is to run it in a separate thread. Threads are implemented at the Operating System (OS) level and allow parallel execution of blocking code. For this purpose, Python provides the Executor interface designed to run tasks in a separate thread and to monitor their progress using futures.

You can initialize a ThreadPoolExecutor by importing it from the concurrent.futures module. The executor will spawn a collection of threads (called workers) that will wait to execute whatever task we throw at them. Once a function is submitted, the executor will take care of dispatching its execution to an available worker thread and keep track of the result. The max_workers argument can be used to select the number of threads.

Note that the executor will not destroy a thread once a task is completed. By doing so, it reduces the cost associated with the creation and destruction of threads. 

In the following example, we create a ThreadPoolExecutor with three workers, and we submit a wait_and_return function that will block the program execution for one second and return a message string. We then use the submit method to schedule its execution:

    from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=3)

def wait_and_return(msg):
time.sleep(1)
return msg

executor.submit(wait_and_return, "Hello. executor")
# Result:
# <Future at 0x7ff616ff6748 state=running>

The executor.submit method immediately schedules the function and returns a future. It is possible to manage the execution of tasks in asyncio using the loop.run_in_executor method, which works quite similarly to executor.submit:

    fut = loop.run_in_executor(executor, wait_and_return, "Hello, asyncio 
executor")
# <Future pending ...more info...>

The run_in_executor method will also return an asyncio.Future instance that can be awaited from other code, the main difference being that the future will not be run until we start the loop. We can run and obtain the response using loop.run_until_complete:

    loop.run_until_complete(fut)
# Result:
# 'Hello, executor'

As a practical example, we can use this technique to implement concurrent fetching of several web pages. To do this, we will import the popular (blocking) requests library and run the requests.get function in the executor:

    import requests

async def fetch_urls(urls):
responses = []
for url in urls:
responses.append(await loop.run_in_executor
(executor, requests.get, url))
return responses

loop.run_until_complete(fetch_ruls(['http://www.google.com',
'http://www.example.com',
'http://www.facebook.com']))
# Result
# []

This version of fetch_url will not block the execution and allow other coroutines in asyncio to run; however, it is not optimal as the function will not fetch a URL in parallel. To do this, we can use asyncio.ensure_future or employ the asyncio.gather convenience function that will submit all the coroutines at once and gather the results as they come. The usage of asyncio.gather is demonstrated here:

    def fetch_urls(urls):
return asyncio.gather(*[loop.run_in_executor
(executor, requests.get, url)
for url in urls])
The number of URLs you can fetch in parallel with this method will be dependent on the number of worker threads you have. To avoid this limitation, you should use a natively non-blocking library, such as aiohttp.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.188.121