Accessing resources asynchronously with the asyncio module

It is a basic fact of life that I/O (for example, file or database access) is slow. I/O is not only slow, but also unpredictable. In a common scenario, we wait for data (from a web service or sensors) and write the data to the filesystem or a database. In such a situation, we can find ourselves to be I/O bound—spending more time waiting for the data than actually processing it. We can poll for data periodically or act on event triggers (either check your watch or set an alarm). GUIs usually have special threads that wait for user input in an infinite loop.

The Python asyncio module for asynchronous I/O uses the concept of coroutines with a related function decorator. A brief example of this module was also given in the Scraping the web recipe of Chapter 5, Web Mining, Databases, and Big Data. Subroutines can be thought of as a special case of coroutines. A subroutine has a start and exit point, either through an early exit with a return statement or by reaching the end of the subroutine definition. In contrast, a coroutine can yield with the yield from statement by calling another coroutine and then resuming execution from that exit point. The coroutine is letting another coroutine take over, as it were, and is going back to sleep until it is activated again.

Subroutines can be placed on a single stack. However, coroutines require multiple stacks, which makes understanding the code and potential exceptions more complex.

How to do it...

The code is in the accessing_asyncio.ipynb file in this book's code bundle:

  1. The imports are as follows:
    import dautil as dl
    import ch12util
    from functools import partial
    import matplotlib.pyplot as plt
    import numpy as np
    from scipy.stats import skew
    import asyncio
    import time
    from IPython.display import HTML
    
    STATS = []
  2. Define the following function to resample:
    def resample(arr):
        sample = ch12util.bootstrap(arr)
        STATS.append((sample.mean(), sample.std(), skew(sample)))
  3. Define the following class to bootstrap:
    class Bootstrapper():
        def __init__(self, data, queue):
            self.data = data
            self.log = dl.log_api.conf_logger(__name__)
            self.queue = queue
    
        @asyncio.coroutine
        def run(self):
            while not self.queue.empty():
                index = yield from self.queue.get()
    
                if index % 10 == 0:
                    self.log.debug('Bootstrap {}'.format(
                        index))
    
                resample(self.data)
                # simulates slow IO
                yield from asyncio.sleep(0.01)
  4. Define the following function to perform serial resampling:
    def serial(arr, n):
        for i in range(n):
            resample(arr)
            # simulates slow IO
            time.sleep(0.01)
  5. Define the following function to perform parallel resampling:
    def parallel(arr, n):
        q = asyncio.Queue()
    
        for i in range(n):
            q.put_nowait(i)
    
        bootstrapper = Bootstrapper(arr, q)
        policy = asyncio.get_event_loop_policy()
        policy.set_event_loop(policy.new_event_loop())
        loop = asyncio.get_event_loop()
    
        tasks = [asyncio.async(bootstrapper.run())
                 for i in range(n)]
    
        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()
  6. Plot distributions of moments and execution times:
    pressure = dl.data.Weather.load()['PRESSURE'].dropna().values
    np.random.seed(33)
    parallel_times = ch12util.time_many(partial(parallel, pressure))
    serial_times = ch12util.time_many(partial(serial, pressure))
    
    dl.options.mimic_seaborn()
    ch12util.plot_times(plt.gca(), serial_times, parallel_times)
    
    sp = dl.plotting.Subplotter(2, 2, context)
    ch12util.plot_times(sp.ax, serial_times, parallel_times)
    
    STATS = np.array(STATS)
    ch12util.plot_distro(sp.next_ax(), STATS.T[0], pressure.mean())
    sp.label()
    
    ch12util.plot_distro(sp.next_ax(), STATS.T[1], pressure.std())
    sp.label()
    
    ch12util.plot_distro(sp.next_ax(), STATS.T[2], skew(pressure))
    sp.label()
    HTML(sp.exit())

Refer to the following screenshot for the end result:

How to do it...

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.103.5