© Matthew Wilkes 2020
M. WilkesAdvanced Python Developmenthttps://doi.org/10.1007/978-1-4842-5793-7_7

7. Parallelization and async

Matthew Wilkes1 
(1)
Leeds, West Yorkshire, UK
 

A common problem that developers find themselves faced with is that they have an operation that spends a lot of time waiting for something to happen, as well as other operations which do not depend on the results of that first operation. It can be frustrating to wait for the slow operation to complete when there are other things the program could be doing. This is the fundamental problem that asynchronous programming tries to solve.

This problem becomes most noticeable during IO operations, such as network requests. In our aggregation process, we have a loop which issues HTTP requests to various endpoints, then processes the results. These HTTP requests can take some time to complete, as they often involve examining external sensors and looking at values over a few seconds. If each request takes 3 seconds to complete, then checking 100 sensors would mean waiting for 5 minutes, on top of all the processing time.

The alternative approach is for us to parallelize some aspects of the program. The most natural functions to parallelize are steps that involve waiting for some external system. If only the three waiting steps in Figure 7-1 could be parallelized, there would be significant time savings, as shown in Figure 7-2.
../images/481001_1_En_7_Chapter/481001_1_En_7_Fig1_HTML.png
Figure 7-1

Step-by-step process for connecting to three sensor servers and downloading their data

../images/481001_1_En_7_Chapter/481001_1_En_7_Fig2_HTML.png
Figure 7-2

Step-by-step process with parallelized waiting, parsing does not necessarily happen in order

Of course, computers have practical limits on how many network requests can be outstanding at once. Anyone who has copied files to an external hard drive is familiar with the idea that some storage media are better equipped to process multiple sequential accesses than parallel ones. The best fit for parallel programming is when there is a balance between IO-bound and CPU-bound operations that need to be performed. If there is an emphasis on CPU-bound, the only speed increases possible are by committing more resources. On the other hand, if there is too much IO happening, we may have to limit the number of simultaneous tasks to avoid a backlog of processing tasks building up.

Nonblocking IO

The simplest way to write asynchronous functions in Python, and one that has been possible for a very long time, is to write functions that use nonblocking IO operations. Nonblocking IO operations are variants of the standard IO operations that return as soon as an operation starts, rather than the normal behavior of returning when it completes.

Some libraries may use these for low-level operations, like reading from a socket, but it’s rare for them to be used in more complex settings or by most Python developers. There are no widely used libraries to allow developers to take advantage of nonblocking IO for HTTP requests, so I cannot recommend it as a practical solution to the problem of managing simultaneous connections to web servers. Still, it’s a technique that was used more in the Python 2 era, and it’s interesting to look at as it helps us understand the advantages and disadvantages of more modern solutions.

We’ll look at an example implementation here so that we can see the differences in how the code must be structured to take advantage of this. The implementation relies on the select.select(...) function of the standard library, which is a wrapper for the select(2) system call. When given a list of file-like objects (which includes sockets and subprocess calls), select returns the ones that have data ready for reading1 or blocks until at least one is ready.

select represents the key idea of asynchronous code, the idea that we can wait for multiple things in parallel, but with a function that handles blocking until some data is ready. The blocking behavior changes from waiting for each task in turn to waiting for the first of multiple simultaneous requests. It may seem counterintuitive that the key to a nonblocking IO process is a function that blocks, but the intention isn’t to remove blocking entirely, it’s to move the blocking to when there is nothing else we could be doing.

Blocking is not a bad thing; it’s what allows our code to have an easy-to-understand execution flow. If select(...) did not block when there were no connections ready, we’d have to introduce a loop to call select(...) repeatedly until a connection is ready. Code that blocks immediately is easier to understand, as it never has to handle cases where a variable is a placeholder for a future result that is not yet ready. The select approach sacrifices some of that naïve clarity in our program flow by deferring the blocking until a later point, but it allows us to take advantage of parallelized waiting.

Caution

The following example functions are very optimistic; they are not standards-compliant HTTP functions, and they make many assumptions about how the server behaves. This is intentional; they are here to illustrate an approach, not as a recommendation for code to be used in the real world. It works well enough for instructional and comparison purposes, and that’s about it.

An example of a program to make some nonblocking IO HTTP requests is shown as Listing 7-1. The most striking difference between the HTTP handling of our code and this sample is the addition of two additional functions – the ones that perform the HTTP request and response actions. Splitting the logic like this makes this approach unappealing, but it’s important to remember that there are equivalents to these functions in the requests package; we’re only seeing them here because we’re looking at a method for which there is no library to fall back on.
import datetime
import io
import json
import select
import socket
import typing as t
import urllib.parse
import h11
def get_http(uri: str, headers: t.Dict[str, str]) -> socket.socket:
    """Given a URI and a set of headers, make a HTTP request and return the
    underlying socket. If there were a production-quality implementation of
    nonblocking HTTP this function would be replaced with the relevant one
    from that library."""
    parsed = urllib.parse.urlparse(uri)
    if parsed.port:
        port = parsed.port
    else:
        port = 80
    headers["Host"] = parsed.netloc
    sock = socket.socket()
    sock.connect((parsed.hostname, port))
    sock.setblocking(False)
    connection = h11.Connection(h11.CLIENT)
    request = h11.Request(method="GET", target=parsed.path, headers=headers.items())
    sock.send(connection.send(request))
    sock.send(connection.send(h11.EndOfMessage()))
    return sock
def read_from_socket(sock: socket.socket) -> str:
    """ If there were a production-quality implementation of nonblocking HTTP
    this function would be replaced with the relevant one to get the body of
    the response if it was a success or error otherwise. """
    data = sock.recv(1000000)
    connection = h11.Connection(h11.CLIENT)
    connection.receive_data(data)
    response = connection.next_event()
    headers = dict(response.headers)
    body = connection.next_event()
    eom = connection.next_event()
    try:
        if response.status_code == 200:
            return body.data.decode("utf-8")
        else :
            raise ValueError("Bad response")
    finally:
        sock.close()
def show_responses(uris: t.Tuple[str]) -> None:
    sockets = []
    for uri in uris:
        print(f"Making request to {uri}")
        sockets.append(get_http(uri, {}))
    while sockets:
        readable, writable, exceptional = select.select(sockets, [], [])
        print(f"{ len(readable) } socket(s) ready")
        for request in readable:
            print(f"Reading from socket")
            response = read_from_socket(request)
            print(f"Got { len(response) } bytes")
            sockets.remove(request)
if __name__ == "__main__":
    show_responses([
        "http://jsonplaceholder.typicode.com/posts?userId=1",
        "http://jsonplaceholder.typicode.com/posts?userId=5",
        "http://jsonplaceholder.typicode.com/posts?userId=8",
    ])
Listing 7-1

Optimistic nonblocking HTTP functions – nbioexample.py

The result of running this file with a Python interpreter would be these three URLs being fetched, then read as their data became available, shown as follows:
> pipenv run python . bioexample.py
Making request to http://jsonplaceholder.typicode.com/posts?userId=1
Making request to http://jsonplaceholder.typicode.com/posts?userId=5
Making request to http://jsonplaceholder.typicode.com/posts?userId=8
1 socket(s) ready
Reading from socket
Got 27520 bytes
1 socket(s) ready
Reading from socket
Got 3707 bytes
1 socket(s) ready
Reading from socket
Got 2255 bytes

The get_http(...) function is what creates the socket. It parses the URL that it has been given and sets up a TCP/IP socket to connect to that server. This does involve some blocking IO, specifically any DNS lookups and socket setup actions, but these are relatively short compared to the time waiting for the body, so I have not attempted to make them nonblocking.

Then, the function sets this socket as nonblocking and uses the h11 library to generate a HTTP request. It’s entirely possible to generate a HTTP request2 with string manipulation alone, but this library simplifies our code significantly.

We call the read_from_socket(...) function once there is data available on the socket. It assumes that there are less than 1000000 bytes of data and that represents a complete response,3 then uses the h11 library to parse this into objects representing the headers and the body of the response. We use that to determine if the request was successful and return either the body of the response or raise a ValueError. The data is decoded as UTF-8 because that’s what Flask is generating for us on the other end. It’s essential to decode with the correct character set; this can be done by providing a header with the character set defined or by having some other guarantee about what the character set is. As we also wrote the server code, we know that we’re using Flask’s built-in JSON support, which uses Flask’s default encoding, which is UTF-8.

Tip

In some situations, you may not know for sure which character encoding is in use. The chardet library analyzes text to suggest the most likely encoding, but this is not foolproof. This library, or fallback like try/except blocks with multiple encodings, is only appropriate when loading data from a source that is not consistent and does not report its encoding. In the majority of cases, you should be able to specify the exact encoding, and you must do this to avoid subtle errors.

Making our code nonblocking

In order to integrate the preceding functions into our codebase, the other functions in our code require some changes, as shown in Listing 7-2. The existing get_data_points(...) function would need to be split into connect_to_server(...) and prepare_datapoints_from_response(...) functions. We thereby expose the socket object to the add_data_from_sensors(...) function, allowing it to use select instead of just looping over each server.
def connect_to_server(server: str, api_key: t.Optional[str]) -> socket.socket:
    if not server.endswith("/"):
        server += "/"
    url = server + "v/2.0/sensors/"
    headers = {}
    if api_key:
        headers["X-API-KEY"] = api_key
    return get_http(url, headers=headers)
def prepare_datapoints_from_response(response: str) -> t.Iterator[DataPoint]:
    now = datetime.datetime.now()
    json_result = json.loads(response)
    if "sensors" in json_result:
        for value in json_result["sensors"]:
            yield DataPoint(
                sensor_name=value["id"], collected_at=now, data=value["value"]
            )
    else:
        raise ValueError(
            f"Error loading data from stream: " + json_result.get("error", "Unknown")
        )
def add_data_from_sensors(
    session: Session, servers: t.Tuple[str], api_key: t.Optional[str]
) -> t.Iterable[DataPoint]:
    points: t.List[DataPoint] = []
    sockets = [connect_to_server(server, api_key) for server in servers]
    while sockets:
        readable, writable, exceptional = select.select(sockets, [], [])
        for request in readable:
            # In a production quality implementation there would be
            # handling here for responses that have only partially been
            # received.
            value = read_from_socket(request)
            for point in prepare_datapoints_from_response(value):
                session.add(point)
                points.append(point)
            sockets.remove(request)
    return points
Listing 7-2

Additional glue functions

It may sound minor, but this is sufficient reason to decide against using this method of making HTTP requests in production code. Without a library to simplify the API here, the cognitive load that is added by using nonblocking sockets is, in my opinion, excessive. The ideal approach would introduce no changes to the program flow, but minimizing the changes helps keep code maintainable. The fact that this implementation leaks the raw sockets into application functions is unacceptable.

Overall, while this approach does reduce waiting time, it requires us to restructure our code significantly, and it only provides savings in the wait step, not in the parsing stage. Nonblocking IO is an interesting technique, but it is only appropriate for exceptional cases and requires significant alterations to program flow as well as abandoning all common libraries to achieve even the most basic outcomes. I don’t recommend this approach.

Multithreading and multiprocessing

A much more common approach is to split the workload into multiple threads or processes. Threads allow logical subproblems to be processed at the same time. It’s possible whether they are CPU-bound or IO-bound. In this model, it’s possible for the parsing of one set of results to happen before the waiting has even started for another, as the entire retrieval process is split into a new thread. Each of the tasks run in parallel, but within a thread everything runs sequentially (as shown in Figure 7-3), with functions blocking as usual.
../images/481001_1_En_7_Chapter/481001_1_En_7_Fig3_HTML.png
Figure 7-3

Parallel tasks when using threading or multiple processes

The code within a thread always executes in order, but when multiple threads are running at once, there is no guarantee that their execution is synchronized in any meaningful way. Even worse than that, there’s no guarantee that the execution of code in different threads is aligned to a statement boundary. When two threads access the same variable, there’s no guarantee that either of the actions are performed first: they can overlap. The internal low-level "bytecode" that Python uses to execute user functions are the building blocks of parallelism in Python, not the statement.

Low-level threads

The lowest-level interface to threads in Python is the threading.Thread object, which effectively wraps a function call into a new thread. A thread’s actions can be customized by passing a function as the target= parameter or by subclassing threading.Thread and defining a run() method, as shown in Table 7-1.
Table 7-1

The two methods of providing the code for a thread to execute

import threading

def helloworld():

     print("Hello world!")

thread = threading.Thread(

    target=helloworld,

    name="helloworld"

)

thread.start()

thread.join()

import threading

class HelloWorldThread(threading.Thread):

    def run(self):

         print("Hello world!")

thread = HelloWorldThread(name="helloworld")

thread.start()

thread.join()

The start() method begins the execution of the thread; the join() method blocks the execution until that thread has completed. The name parameter is mostly useful for debugging performance problems, but it’s a good habit always to set a name if you’re ever creating threads manually.

Threads do not have a return value, so if they need to return a computed value, that can be tricky. One way of passing a value back is by using a mutable object that it can change in place or, if using the subclass method, by setting an attribute on the thread object.

An attribute on the thread object is a good approach when there is a single, simple return type, such as a boolean success value, or the result of a computation. It’s a good fit for when the thread is doing a discrete piece of work.

A mutable object is the best fit when you have multiple threads, each working on a part of a common problem, for example, gathering the sensor data from a set of URLs, with each thread being responsible for one URL. The queue.Queue object is perfect for this purpose.

Exercise 7-1: Write a Wrapper To Return via a Queue

Rather than adjust the function directly, write some code to wrap any arbitrary function and store its results in a queue instead of directly returning, to allow the function to be run cleanly as threads. If you get stuck, look back at Chapter 5 and how to write a decorator that takes arguments.

The function, return_via_queue(...), should be such that the following code works:
from __future__ import annotations
...
def add_data_from_sensors(
    session: Session, servers: t.Tuple[str], api_key: t.Optional[str]
) -> t.Iterable[DataPoint]:
    points: t.List[DataPoint] = []
    q: queue.Queue[t.List[DataPoint]] = queue.Queue()
    wrap = return_via_queue(q)
    threads = [
        threading.Thread(target=wrap(get_data_points), args=(server, api_key))
        for server in servers
    ]
    for thread in threads:
        # Start all threads
        thread.start()
    for thread in threads:
        # Wait for all threads to finish
        thread.join()
    while not q.empty():
        # So long as there's a return value in the queue, process one # thread's results
        found = q.get_nowait()
        for point in found:
            session.add(point)
            points.append(point)
    return points

You must also adjust the get_data_points(...) function to return a list of DataPoint objects, rather than an iterator of them, or to do an equivalent conversion in the wrapper function. This is to ensure that all the data is processed in the thread before it returns its data to the main thread. As generators don’t produce their values until the values are requested, we need to ensure that the requesting happens within the thread.

An example implementation of the wrapper method and a simple threaded version of this program is available in the code samples for this chapter.

Note on __future__ imports

Statements like from __future__ import example are ways of enabling features that will be part of a future version of Python. They must be at the very top of a Python file, with no other statements before them.

In this case, the line q: queue.Queue[t.List[DataPoint]] = queue.Queue() is the problem. The queue.Queue object in the standard library is not a generic type in Python 3.8, so it cannot accept a type definition of the type of objects it contains. This omission is tracked as bug 33315 in Python, where there is justified reluctance to either add a new typing.Queue type or to adjust the built-in type.

Despite this, mypy treats queue.Queue as a generic type; it’s just the Python interpreter that does not. There are two ways of fixing this, either by using a string-based type hint so that the Python interpreter doesn’t try to evaluate queue.Queue[...] and fail
    q: "queue.Queue[t.List[DataPoint]]" = queue.Queue()

or by using the annotations option from __future__, which enables the type annotation parsing logic planned for Python 4. This logic prevents Python from parsing annotations at runtime and is the approach taken in the preceding sample.

This low level of threading is not at all user-friendly. As we’ve seen in the preceding exercise, it is possible to write a wrapper code that makes functions work unchanged in a threaded environment. It would also be possible to write a wrapper for the threading.Thread object that automatically wraps the function being called and automatically retrieves the result from an internal queue and returns it to the programmer seamlessly.

Luckily, we don’t have to write such a feature in production code; there’s a helper built-in to the Python standard library: concurrent.futures.ThreadPoolExecutor. The ThreadPoolExecutor manages the number of threads in use, allowing the programmer to limit the number of threads that execute at once.

The equivalent invocation of a single hello world thread using a ThreadPoolExecutor would be
from concurrent.futures import ThreadPoolExecutor
def helloworld():
    print("Hello world!")
with ThreadPoolExecutor() as pool:
    pool.submit(helloworld)

Here, we see a context manager that defines the period where the pool of threads is active. As no max_threads argument is passed to the executor, Python picks an amount of threads based on the number of CPUs available on the computer running the program.

Once inside this context manager, the program submits function calls to the thread pool. The pool.submit(...) function can be called any number of times to schedule additional tasks, the result of which is a Future object representing that task. Futures will be very familiar to developers who have worked with modern JavaScript; they are objects that represent a value (or an error) that will be present at some point in the future. The result() method returns whatever value the function that was submitted returned. If that function raised an exception, then the same exception will be raised when the result() method is called.
from concurrent.futures import ThreadPoolExecutor
def calculate():
    return 2**16
with ThreadPoolExecutor() as pool:
    task = pool.submit(calculate)
>>> print(task.result())
65536
Caution

If you don’t access the result() method of a future, then any exceptions it raises are never propagated to the main thread. This can make debugging difficult, so it’s best to ensure you always access the result, even if you never assign it to a variable.

If result() is called within the with block, execution blocks until the relevant task has completed. When the with block ends, execution blocks until all scheduled tasks have completed, so calls to the result method after the with block ends always return immediately.

Bytecode

In order to understand some of the limits of threading in Python, we need to look behind the curtain of how the interpreter loads and runs code. In this section, Python code may be shown annotated with the underlying bytecode used by the interpreter. This bytecode is an implementation detail and is stored in .pyc files. It encodes the behavior of the program at the lowest level. Interpreting a complex language like Python is not a trivial task, so the Python interpreter caches its interpretation of code as a series of many simple operations.

When people talk about Python, they generally are talking about CPython, the implementation of Python in the C programming language. CPython is the reference implementation, in that it’s intended to be what people refer to when seeing how Python does things. There are other implementations, the most popular of which is PyPy, an implementation of Python that is written in a specially designed, Python-like language rather than C.4 Both CPython and PyPy cache their interpretation of Python code as Python bytecode.

Two other implementations of Python are worth mentioning: Jython and IronPython. Both of these also cache their interpretation as bytecode, but crucially they use a different bytecode. Jython uses the same bytecode format as Java, and IronPython uses the same bytecode format as .NET. For this chapter, when we talk about bytecode, we’re talking about Python bytecode, as we’re looking at it in the context of how threads are implemented in CPython.

Generally speaking, you won’t have to worry about bytecode, but an awareness of its role is useful for writing multithreaded code. The samples given in the following were generated using the dis module5 in the standard library. The function dis.dis(func) shows the bytecode for a given function, assuming that it’s written in Python, rather than a C extension. For example, the sorted(...) function is implemented in C and therefore has no bytecode to show.

To demonstrate this, let’s look at a function with its disassembly (Listing 7-3). The function has been annotated with the disassembly results from dis.dis(increment), which shows the line number within the file, the bytecode offset of the instruction within the function, the instruction name, and any instruction parameters as their raw values with the Python representation in parentheses.
num = 0
def increment():
    global num
    num += 1          # 5  0    LOAD_GLOBAL              0 (num)
                      #    2    LOAD_CONST               1 (1)
                      #    4    INPLACE_ADD
                      #    6    STORE_GLOBAL             0 (num)
    return None       # 10 8    LOAD_CONST               0 (None)
                      #    10   RETURN_VALUE
Listing 7-3

A simple function to increment a global variable

The line num += 1 looks like an atomic operation,6 but the bytecode reveals that the underlying interpreter runs four operations to complete it. We don’t care what these four instructions are, just the fact that we cannot trust our intuition on what operations are atomic and which are not.

If we were to run this increment function 100 times in succession, the result stored to num would be 100, which makes logical sense. If this function were to be executed in a pair of threads, there would be no guarantee that the final result would be 100. In this case, the correct result is only found so long as no thread ever executes the LOAD_GLOBAL bytecode step while another is running the LOAD_CONST, IN_PLACE_ADD, or STORE_GLOBAL steps. Python does not guarantee this, so the preceding code is not thread-safe.

There is overhead to starting a thread, and the computer will be running multiple processes at the same time. The two threads could happen to run sequentially, despite having two threads available, or they could both start at the same time, or there could be an offset between the start times. The way the executions can overlap is represented in Figure 7-4.
../images/481001_1_En_7_Chapter/481001_1_En_7_Fig4_HTML.png
Figure 7-4

Possible arrangements of two threads executing num += 1 at once. Only the leftmost and rightmost examples produce the correct result

The GIL

That is somewhat simplified, however. CPython has a feature called the GIL, or Global Interpreter Lock, which is used to make thread-safety easier.7 This lock means that only one thread can be executing Python code at once. That’s not enough to solve our problem, however, because the granularity of the GIL is at the level of bytecode, so although no two bytecode instructions execute simultaneously, the interpreter can still switch between each path, causing overlap. As such, Figure 7-5 shows a more accurate representation of how the threads can overlap.
../images/481001_1_En_7_Chapter/481001_1_En_7_Fig5_HTML.png
Figure 7-5

Possible arrangements of num += 1 execution with the GIL active. Only the leftmost and rightmost produce the correct result

It might appear that the GIL removes the benefit of threading without guaranteeing the correct result, but it’s not as bad as it immediately appears. We will deal with the benefits of this shortly, but first, we should address this negating the advantages of threading. It’s not strictly true that no two bytecode instructions can run at once.

Bytecode instructions are much simpler than lines of Python, allowing the interpreter to reason about what actions it is taking at any given point. It can, therefore, allow multiple threads to execute when it’s safe to do so, such as during a network connection or when waiting for data to be read from a file.

Specifically, not everything the Python interpreter does requires the GIL to be held. It must be held at the start and end of a bytecode instruction, but it can be released internally. Waiting for a socket to have data available for reading is one of the things that can be done without holding the GIL. During a bytecode instruction where an IO operation is happening, the GIL can be released, and the interpreter can simultaneously execute any code that does not require the GIL to be held, so long as it’s in a different thread. Once the IO operation finishes, it must wait to regain the GIL from whichever thread took it, before execution can continue.

In situations like this, where the code never has to wait for an IO function to complete, Python interrupts the threads at set intervals to schedule the others fairly. By default, this is approximately every 0.005 seconds which happens to be a long enough period that our example works as hoped on my computer. If we manually tell the interpreter to switch threads more frequently, using the sys.setswitchinterval(...) function, we start to see failures.

Code for testing thread-safety at different switch intervals
if __name__ == "__main__":
    import concurrent.futures
    import sys
    for si in [0.005, 0.0000005, 0.0000000005]:
        sys.setswitchinterval(si)
        results = []
        for attempt in range(100):
            with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:
                for i in range(100):
                    pool.submit(increment)
            results.append(num)
            num = 0
        correct = [a for a in results if a == 100]
        pct = len(correct) / len(results)
        print(f"{pct:.1%} correct at sys.setswitchinterval({si:.10f})")
On my computer, the result of running this is
100.0% correct at sys.setswitchinterval(0.0050000000)
71.0% correct at sys.setswitchinterval(0.0000005000)
84.0% correct at sys.setswitchinterval(0.0000000005)

The default behavior being 100% correct in my test doesn’t mean that it solves the problem. 0.005 is a well-chosen interval that results in a lower chance of errors for most people. The fact that a function happens to work when you test it does not mean that it’s guaranteed to always work on every machine. The trade-off to introducing threads is that you gain relatively simple concurrency but without strong guarantees about shared state.

Locks and deadlocks

By enforcing a rule that bytecode instructions do not overlap, they’re guaranteed to be atomic. There is no risk of two STORE bytecode instructions for the same value happening simultaneously, as no two bytecode instructions can run truly simultaneously. It may be that the implementation of an instruction voluntarily releases the GIL and waits to reobtain it for sections of its implementation, but that is not the same as any two arbitrary instructions happening in parallel. This atomicity is used by Python to build thread-safe types and synchronization tools.

If you need to share state between threads, you must manually protect this state with locks. Locks are objects that allow you to prevent your code from running at the same time as other code that it would interfere with. If two concurrent threads both try to acquire the lock, only one will succeed. Any other threads that try to acquire the lock will be blocked until the first thread has released it. This is possible because the locks are implemented in C code, meaning their execution takes place as one bytecode step. All the work of waiting for a lock to become available and acquiring it is performed in response to a single bytecode instruction, making it atomic.

Code protected by a lock can still be interrupted, but no conflicting code will be run during these interruptions. Threads can still be interrupted while they hold a lock. If the thread that they are interrupted in favor of attempts to take that same lock, it will fail to do so and will pause execution. In an environment with two threads, this means execution would pass straight back to the first function. If more than one thread is active, it may pass to other threads first, but the same inability to take the first thread’s lock applies.

Increment function with locking
import threading
numlock = threading.Lock()
num = 0
def increment():
    global num
    with numlock:
        num += 1
    return None

In this version of the function, the lock called numlock is used to guard the actions that read/write the num value. This context manager causes the lock to be acquired before execution passes to the body and to be released before the first line after the body. Although we’ve added some overhead here, it is minimal, and it guarantees that the result of the code is correct, regardless of any user settings or different Python interpreter versions.

Result of testing with a lock surrounding num += 1
100.0% correct at sys.setswitchinterval(0.0050000000)
100.0% correct at sys.setswitchinterval(0.0000005000)
100.0% correct at sys.setswitchinterval(0.0000000005)
This code results in the correct result being found no matter what the switching interval is, as the four bytecode instructions that make up num += 1 are guaranteed to be executed as one block. There is an additional locking bytecode instruction before and after each block of four, as shown in Figure 7-6.
../images/481001_1_En_7_Chapter/481001_1_En_7_Fig6_HTML.png
Figure 7-6

Possible arrangements of num += 1 on two threads with explicit locking shown at the start and end

From the perspective of the two threads being used in the thread pool, the with numlock : line may block execution, or it may not. Neither thread needs to do anything special to handle the two cases (acquiring the lock immediately or waiting for a turn), and therefore this is a relatively minimal change to the control flow.

The difficulty comes in ensuring that required locks are in place and that no contradictions exist. If a programmer defines two locks and uses them simultaneously, it’s possible to create a situation where the program becomes deadlocked.

Deadlocks

Consider a situation where we are incrementing two numbers in one thread and decrementing them in another, resulting in the following functions:
num = 0
other = 0
def increment():
    global num
    global other
    num += 1
    other += 1
    return None
def decrement():
    global num
    global other
    other -= 1
    num -= 1
    return None
The program suffers from the same problem that we had previously; if we schedule these functions in a ThreadPoolExecutor, then the result may be incorrect. We might think to apply the same locking pattern that fixed this previously, adding an otherlock lock to complement the numlock lock that we already created, but the potential for deadlocks lurks in this code. There are three ways we could arrange the locks in these functions (shown in Table 7-2), one of which can cause deadlocks.
Table 7-2

Three locking approaches for simultaneously updating two variables

Minimizing locked code(resistant to deadlocks)

num = 0

other = 0

numlock = hreading.Lock()

otherlock = hreading.Lock()

def increment():

    global num

    global other

    with numlock:

        num += 1

    with otherlock:

        other += 1

    return None

def decrement():

     global num

     global other

     with otherlock:

        other -= 1

    with numlock:

        num -= 1

    return None

Using locks in a consistent order (resistant to deadlocks)

num = 0

other = 0

numlock = threading.Lock()

otherlock = hreading.Lock()

def increment():

     global num

     global other

     with numlock, otherlock:

        num += 1

               cother += 1

    return None

def decrement():

     global num

     global other

     with numlock, otherlock:

        other -= 1

        num -= 1

    return None

Using locks in an inconsistent order (causes deadlocks)

num = 0

other = 0

numlock = hreading.Lock()

otherlock = hreading.Lock()

def increment():

     global num

     global other

     with numlock, otherlock:

        num += 1

        other += 1

    return None

def decrement():

    global num

    global other

    with otherlock, numlock:

        other -= 1

        num -= 1

    return None

The best option is to ensure that we never hold both locks simultaneously. This makes them truly independent, so there is no risk of deadlock. In this pattern, threads never wait to acquire a lock until they’ve already released the previous lock they held.

The middle implementation uses both locks at once. This is less good, as it’s holding locks for longer than they are needed, but sometimes it’s unavoidable for code to need to lock two variables. Although both of the preceding functions can be written to use only one lock at a time, consider the case of a function that exchanges the values:
def switch():
    global num
    global other
    with numlock, otherlock:
        num, other = other, num
    return None

This function requires that neither num nor other is being used by another thread while it’s executing, so it needs to keep both numbers locked. Locks are acquired in the same order in the increment() and decrement() (and switch()) functions, so each one tries to acquire numlock before otherlock. If both threads were synchronized in their execution, they would both try to acquire numlock at the same time and one would block. No deadlocks would occur.

The final example shows an implementation where the ordering of the locks in the decrement() function has been inverted. This is very difficult indeed to notice but has the effect of causing deadlocks. It’s possible for a thread running this third version of increment() to acquire the numlock lock at the same time that a thread running decrement acquires the otherlock lock. Now both threads are waiting to acquire the lock they don’t have, and neither can release their lock until after they’ve acquired the missing one. This causes the program to hang indefinitely.

There are a few ways to avoid this problem. As this is a logical assertion about the structure of your code, the natural tool would be to a static checker to ensure that your code never inverts the order in which locks are acquired. Unfortunately, I do not know of any existing implementation of this check for Python code.

The most straightforward alternative is to use a single lock to cover both variables rather than to lock them individually. Although this is superficially attractive, it does not scale well as the number of objects that need protecting grows. A single lock object would prevent any work being done to the num variable while another thread works on the other variable. Sharing locks across independent functions greatly increases the amount of blocking involved in your code, which can serve to negate the advantages brought by threading.

You might be tempted to abandon the with numlock: method of acquiring a lock and calling the lock’s acquire() method directly. While this allows you to specify a timeout and an error handler in case the lock was not acquired within the timeout, I would not recommend it. The change makes the code’s logic harder to follow with the introduction of an error handler, and the only appropriate response to detecting a deadlock in this manner is to raise an exception. This slows down the program because of the timeouts and doesn’t solve the problem. This approach may be useful when debugging locally, to allow you to examine the state during a deadlock, but it should not be considered for production code.

My recommendation would be that you should use all these approaches to preventing deadlocks. Firstly, you should use the minimum amount of locks necessary to make your program thread-safe. If you do need multiple locks, you should minimize the time that they’re held for, releasing them as soon as the shared state has been manipulated. Finally, you should define an ordering of your locks and always use this ordering when acquiring locks. The easiest method to do this is always to acquire locks alphabetically. Ensuring a fixed ordering of locks still requires manual checking of your code, but each use of locks can be checked against your rule independently rather than against all other uses.

Avoiding global state

It’s not always possible to avoid global state, but in many situations, it is. Generally speaking, it’s possible to schedule two functions to run in parallel if neither function depends on the values of shared variables.8 Imagine that instead of 100 calls to increment() and 100 calls to decrement(), we were scheduling 100 calls to increment() and 1 to a function called save_number_to_database(). There is no guarantee how many times increment() will have completed before save_number_to_database() is called. The number saved could be anywhere between 0 and 100, which is clearly not useful. These functions don’t make sense to be run in parallel because they both depend on the value of a shared variable.

There are a couple of main ways that shared data can interrelate. Shared data can be used to collate data across multiple threads, or it can be used to pass data between multiple threads.

Collating data

Our two increment() and decrement() functions are only simple demonstrations. They manipulate their shared state by adding or subtracting one, but normally functions run in parallel would do a more complex manipulation. For example, in apd.aggregation, the shared state is the set of sensor results we have, and each thread adds more results to that set.

With both of these examples, we can split the work of deciding what the manipulation should be and applying the manipulation. As it’s only the stage where we apply the manipulation that requires access to shared state, this allows us to do any calculations or IO operations in parallel. Each thread would then return the result and then merge the results together at the end, as shown in Listing 7-4.
import concurrent.futures
import threading
def increment():
    return 1
def decrement():
    return -1
def onehundred():
    tasks = []
    with concurrent.futures.ThreadPoolExecutor() as pool:
        for i in range(100):
            tasks.append(pool.submit(increment))
            tasks.append(pool.submit(decrement))
    number = 0
    for task in tasks:
        number += task.result()
    return number
if __name__ == "__main__":
    print(onehundred())
Listing 7-4

Example of using task result to store intended changes

Passing data

The examples we’ve covered so far all involve the main thread delegating work to subthreads, but it’s common for new tasks to be discovered during the processing of the data from earlier tasks. For example, most APIs paginate data, so if we had a thread to fetch URLs and a thread to parse the responses, we need to be able to pass initial URLs to the fetch thread from the main thread and also to pass newly discovered URLs from the parse thread to the fetch thread.

When passing data between two (or more) threads, we need to use queues, either queue.Queue or the variant queue.LifoQueue. These implement FIFO and LIFO9 queues, respectively. While we previously used Queue only as a convenient, thread-safe data holder, now we’ll be using it as intended.

Queues have four primary methods.10 The get() and put() methods are self-explanatory, except to say that if the queue is empty, then the get() method blocks, and if the queue has a maximum length set and is full, then the put() method blocks. In addition, there is a task_done() method , which is used to tell the queue that an item has been successfully processed, and a join() method, which blocks until all items have been successfully processed. The join() method is usually called by the thread that adds the items to the queue, to allow it to wait until all work has been completed.

Because the get() method blocks if the queue is currently empty, it’s not possible to use this method in nonthreaded code. It does, however, make them perfect for threaded code where there is a need to wait until the thread producing data has made it available.

Tip

It’s not always clear in advance how many items will be stored in a queue. If get() is called after the last item has been retrieved, then it will block indefinitely. This can be avoided by providing a timeout parameter to get, in which case it will block for the given amount of seconds before raising a queue.Empty exception. A better approach is to send a sentinel value, like None. The code can then detect this value and know that it no longer needs to retrieve new values.

If we were building a threaded program to get information from the GitHub public API, we’d need to be able to retrieve URLs and parse their results. It would be nice to be able to do parsing while URLs are being fetched, so we would split the code between fetching and parsing functions.

Listing 7-5 shows an example of such a program, where multiple GitHub repos can have their commits retrieved in parallel. It uses three queues, one for the input to the fetch thread, one for the output of fetch and input of parse, and one for the output of parse.
from concurrent.futures import ThreadPoolExecutor
import queue
import requests
import textwrap
def print_column(text, column):
    wrapped = textwrap.fill(text, 45)
    indent_level = 50 * column
    indented = textwrap.indent(wrapped, " " * indent_level)
    print(indented)
def fetch(urls, responses, parsed):
    while True:
        url = urls.get()
        if url is None:
            print_column("Got instruction to finish", 0)
            return
        print_column(f"Getting {url}", 0)
        response = requests.get(url)
        print_column(f"Storing {response} from {url}", 0)
        responses.put(response)
        urls.task_done()
def parse(urls, responses, parsed):
    # Wait for the initial URLs to be processed
    print_column("Waiting for url fetch thread", 1)
    urls.join()
    while not responses.empty():
        response = responses.get()
        print_column(f"Starting processing of {response}", 1)
        if response.ok:
            data = response.json()
            for commit in data:
                parsed.put(commit)
            links = response.headers["link"].split(",")
            for link in links:
                if "next" in link:
                    url = link.split(";")[0].strip("<>")
                    print_column(f"Discovered new url: {url}", 1)
                    urls.put(url)
        responses.task_done()
        if responses.empty():
            # We have no responses left, so the loop will
            # end. Wait for all queued urls to be fetched
            # before continuing
            print_column("Waiting for url fetch thread", 1)
            urls.join()
    # We reach this point if there are no responses to process
    # after waiting for the fetch thread to catch up. Tell the
    # fetch thread that it can stop now, then exit this thread.
    print_column("Sending instruction to finish", 1)
    urls.put(None)
def get_commit_info(repos):
    urls = queue.Queue()
    responses = queue.Queue()
    parsed = queue.Queue()
    for (username, repo) in repos:
        urls.put(f"https://api.github.com/repos/{username}/{repo}/commits")
    with ThreadPoolExecutor() as pool:
        fetcher = pool.submit(fetch, urls, responses, parsed)
        parser = pool.submit(parse, urls, responses, parsed)
    print(f"{parsed.qsize()} commits found")
if __name__ == "__main__":
    get_commit_info(
        [("MatthewWilkes", "apd.sensors"), ("MatthewWilkes", "apd.aggregation")]
    )
Listing 7-5

Threaded API client

Running this code results in a two-column output, consisting of the messages from each thread. The full output is too long to include here, but a small section is given in the following as a demonstration:
Getting https://api.github.com/repos/MatthewW
ilkes/apd.aggregation/commits
Storing <Response [200]> from https://api.git
hub.com/repos/MatthewWilkes/apd.aggregation/c
ommits
                                                  Starting processing of <Response [200]>
                                                  Discovered new url: https://api.github.com/
                                                  repositories/188280485/commits?page=2
                                                  Starting processing of <Response [200]>
Getting https://api.github.com/repositories/1
88280485/commits?page=2
                                                  Discovered new url: https://api.github.com/
                                                  repositories/222268232/commits?page=2

By examining the logged messages from each of the threads, we can view how their work is scheduled in parallel. Firstly, the main thread sets up the necessary queues and subthreads, then waits for all the threads to finish. As soon as the two subthreads start, the fetch thread starts working on the URLs passed by the main thread, and the parse thread quickly pauses while waiting for responses to parse.

The parse thread uses urls.join() when there is no work for it, so whenever it runs out of work, it waits until the fetch thread has caught up with all the work that it was sent. This is visible in Figure 7-7, as the parse lines always resume after the fetch lines are complete.
../images/481001_1_En_7_Chapter/481001_1_En_7_Fig7_HTML.png
Figure 7-7

Diagram of the timing of the three threads in Listing 7-5

The fetch thread doesn’t use the join() method of any of the queues, it uses get() to block until there is some work to do. As such, the fetch thread can be seen resuming while the parse thread is still executing. Finally, the parse thread sends a sentinel value to the fetch thread to end, and when both exits the thread pool context manager in the main thread exits and execution returns to the main thread.

Other synchronization primitives

The synchronization we used with queues in the preceding example is more complex than the lock behavior we used earlier. In fact, there are a variety of other synchronization primitives available in the standard library. These allow you to build more complex thread-safe coordination behaviors.

Reentrant locks

The Lock object is very handy, but it’s not the only system used for synchronizing code across threads. Perhaps the most important of the others is the reentrant lock, which is available as threading.RLock. A reentrant lock is one that can be acquired more than once, so long as the acquisitions are nested.
from concurrent.futures import ThreadPoolExecutor
import threading
num = 0
numlock = threading.RLock()
def fiddle_with_num():
    global num
    with numlock:
        if num == 4:
            num = -50
def increment():
    global num
    with numlock:
        num += 1
        fiddle_with_num()
if __name__ == "__main__":
    with ThreadPoolExecutor() as pool:
        for i in range(8):
            pool.submit(increment)
    print(num)
Listing 7-6

An example of nested locking using RLocks

The advantage conferred here is that functions that depend on a lock being held can call others that also depend on the same lock being held, without the second blocking until the first releases it. That greatly simplifies creating APIs that use locks.

Example output from Listing 7-6
> python .listing7-06-reentrantlocks.py
-46

Conditions

Unlike the locks that we’ve used so far, conditions declare that a variable is ready, not that it is busy. Queues use conditions internally to implement the blocking behavior of get(), put(...), and join(). Conditions allow for more complex behaviors than a lock being acquired.

Conditions are a way of telling other threads that it’s time to check for data, which must be stored independently. Threads that are waiting for data call the condition’s wait_for(...) function inside a context manager, whereas threads that are supplying data call the notify() method. There is no rule that a thread can’t do both at different times; however, if all threads are waiting for data and none are sending, it’s possible to introduce a deadlock.

For example, when calling the get(...) method of a queue, the code immediately acquires the queue’s single lock through its internal not_empty condition, then checks to see if the internal storage of the queue has any data available. If it does, then an item is returned and the lock is released. Keeping the lock for this time ensures that no other users can retrieve that item at the same time, so there is no risk of duplication. If, however, there is no data in the internal storage, then the not_empty.wait() method is called. This releases the single lock, allowing other threads to manipulate the queue, and does not reacquire the lock and return until the condition is notified that a new item has been added.

There is a variant of the notify() method called notify_all(). The standard notify() method only wakes one thread that’s waiting on the condition, whereas notify_all() wakes all threads waiting. It’s always safe to use notify_all() in place of notify(), but notify() saves waking up multiple threads when it’s expected that only one will be unblocked.

A condition alone is only enough to send a single bit of information: that data has been made available. To actually retrieve the data, we must store it in some fashion, like the internal storage of the queue.

The example in Listing 7-7 creates two threads, each pulling a number from a shared data list and then pushing the number modulo 2 to a shared results list. The code uses two conditions to achieve this, one to ensure there is data available to be processed and one to determine when the threads should be shut down.
from concurrent.futures import ThreadPoolExecutor
import sys
import time
import threading
data = []
results = []
running = True
data_available = threading.Condition()
work_complete = threading.Condition()
def has_data():
    """ Return true if there is data in the data list """
    return bool(data)
def num_complete(n):
    """Return a function that checks if the results list has the length specified by n"""
    def finished():
        return len(results) >= n
    return finished
def calculate():
    while running:
        with data_available:
            # Acquire the data_available lock and wait for has_data
            print("Waiting for data")
            data_available.wait_for(has_data)
            time.sleep(1)
            i = data.pop()
        with work_complete:
            if i % 2:
                results.append(1)
            else:
                results.append(0)
            # Acquire the work_complete lock and wake listeners
            work_complete.notify_all()
if __name__ == "__main__":
    with ThreadPoolExecutor() as pool:
        # Schedule two worker functions
        workers = [pool.submit(calculate), pool.submit(calculate)]
        for i in range(200):
            with data_available:
                data.append(i)
                # After adding each piece of data wake the data_available lock
                data_available.notify()
        print("200 items submitted")
        with work_complete:
            # Wait for at least 5 items to be complete through the work_complete lock
            work_complete.wait_for(num_complete(5))
        for worker in workers:
            # Set a shared variable causing the threads to end their work
            running = False
        print("Stopping workers")
    print(f"{len(results)} items processed")
Listing 7-7

An example program using conditions

Example output from Listing 7-7
> python .listing7-07-conditions.py
Waiting for data
Waiting for data
200 items submitted
Waiting for data
Waiting for data
Waiting for data
Stopping workers
Waiting for data
Waiting for data
7 items processed

Barriers

Barriers are the most conceptually simple synchronization objects in Python. A barrier is created with a known number of parties. When a thread calls wait(), it blocks until there are the same number of threads waiting as the number of parties to the barrier. That is, threading.Barrier(2) blocks the first time wait() is called, but the second call returns immediately and releases the first blocking call.

Barriers are useful when multiple threads are working on aspects of a single problem, as they can prevent a backlog of work building up. A barrier allows you to ensure that a group of threads only run as quickly as the slowest member of the group.

A timeout can be included in the initial creation of the barrier or any wait() call. If any wait call takes longer than its timeout, then all waiting threads raise a BrokenBarrierException, as will any subsequent threads that try to wait for that barrier.

The example in Listing 7-8 demonstrates synchronizing a group of five threads that each wait a random amount of time so that they all continue execution once the last is ready.
from concurrent.futures import ThreadPoolExecutor
import random
import time
import threading
barrier = threading.Barrier(5)
def wait_random():
    thread_id = threading.get_ident()
    to_wait = random.randint(1, 10)
    print(f"Thread {thread_id:5d}: Waiting {to_wait:2d} seconds")
    start_time = time.time()
    time.sleep(to_wait)
    i = barrier.wait()
    end_time = time.time()
    elapsed = end_time - start_time
    print(
        f"Thread {thread_id:5d}: Resumed in position {i} after {elapsed:3.3f} seconds"
    )
if __name__ == "__main__":
    with ThreadPoolExecutor() as pool:
        # Schedule two worker functions
        for i in range(5):
            pool.submit(wait_random)
Listing 7-8

Example of using a barrier

Example output from Listing 7-8
> python .listing7-08-barriers.py
Thread 21812: Waiting  8 seconds
Thread 17744: Waiting  2 seconds
Thread 13064: Waiting  4 seconds
Thread 14064: Waiting  6 seconds
Thread 22444: Waiting  4 seconds
Thread 21812: Resumed in position 4 after 8.008 seconds
Thread 17744: Resumed in position 0 after 8.006 seconds
Thread 22444: Resumed in position 2 after 7.999 seconds
Thread 13064: Resumed in position 1 after 8.000 seconds
Thread 14064: Resumed in position 3 after 7.999 seconds

Event

Events are another simple synchronization method. Any number of threads can call the wait() method on an event, which blocks until the event is triggered. An event can be triggered at any time by calling the set() method, which wakes all threads waiting for the event. Any subsequent calls to the wait() method return immediately.

As with barriers, events are very useful for ensuring that multiple threads stay synchronized, rather than some racing ahead. Events differ in that they have a single thread that makes the decision for when the group can continue, so they are a good fit for programs where a thread is dedicated to managing the others.

The event method can also be reset using the clear() method, so any more future calls to wait() will block. An event’s current state can be examined with the is_set() method. The example in Listing 7-9 uses an event to synchronize a group of threads with one master thread, such that they all wait at least as long as the master, but no longer.
from concurrent.futures import ThreadPoolExecutor
import random
import time
import threading
event = threading.Event()
def wait_random(master):
    thread_id = threading.get_ident()
    to_wait = random.randint(1, 10)
    print(f"Thread {thread_id:5d}: Waiting {to_wait:2d} seconds " f"(Master: {master})")
    start_time = time.time()
    time.sleep(to_wait)
    if master:
        event.set()
    else:
        event.wait()
    end_time = time.time()
    elapsed = end_time - start_time
    print(
        f"Thread {thread_id:5d}: Resumed after {elapsed:3.3f} seconds"
    )
if __name__ == "__main__":
    with ThreadPoolExecutor() as pool:
        # Schedule two worker functions
        for i in range(4):
            pool.submit(wait_random, False)
        pool.submit(wait_random, True)
Listing 7-9

Example of using events to set a minimum wait time

Example console output of Listing 7-9
> python .listing7-09-events.py
Thread 19624: Waiting  9 seconds (Master: False)
Thread  1036: Waiting  1 seconds (Master: False)
Thread  6372: Waiting 10 seconds (Master: False)
Thread 16992: Waiting  1 seconds (Master: False)
Thread 22100: Waiting  6 seconds (Master: True)
Thread 22100: Resumed after 6.003 seconds
Thread 16992: Resumed after 6.005 seconds
Thread  1036: Resumed after 6.013 seconds
Thread 19624: Resumed after 9.002 seconds
Thread  6372: Resumed after 10.012 seconds

Semaphore

Finally, semaphores are conceptually more complex but are a very old concept and so are common to many languages. A semaphore is similar to a lock, but it can be acquired by multiple threads simultaneously. When a semaphore is created, it must be given a value. The value is the number of times that it can be acquired simultaneously.

Semaphores are very useful for ensuring that operations that rely on a scarce resource (such as ones that use a lot of memory or open network connections) are not run in parallel above a certain threshold. For example, Listing 7-10 demonstrates five threads that wait a random amount of time, but where only three can wait at one time.
from concurrent.futures import ThreadPoolExecutor
import random
import time
import threading
semaphore = threading.Semaphore(3)
def wait_random():
    thread_id = threading.get_ident()
    to_wait = random.randint(1, 10)
    with semaphore:
        print(f"Thread {thread_id:5d}: Waiting {to_wait:2d} seconds")
        start_time = time.time()
        time.sleep(to_wait)
        end_time = time.time()
        elapsed = end_time - start_time
        print(
            f"Thread {thread_id:5d}: Resumed after {elapsed:3.3f} seconds"
        )
if __name__ == "__main__":
    with ThreadPoolExecutor() as pool:
        # Schedule two worker functions
        for i in range(5):
            pool.submit(wait_random)
Listing 7-10

Example of using semaphores to ensure only one thread waits at once

Example console output of Listing 7-10
> python .listing7-10-semaphore.py
Thread 10000: Waiting 10 seconds
Thread 24556: Waiting  1 seconds
Thread 15032: Waiting  6 seconds
Thread 24556: Resumed after 1.019 seconds
Thread 11352: Waiting  8 seconds
Thread 15032: Resumed after 6.001 seconds
Thread  6268: Waiting  4 seconds
Thread 11352: Resumed after 8.001 seconds
Thread 10000: Resumed after 10.014 seconds
Thread  6268: Resumed after 4.015 seconds

ProcessPoolExecutors

Just as we’ve looked at the use of ThreadPoolExecutor to delegate the execution of code to different threads, which causes us to fall foul of the GIL’s restrictions, we can use the ProcessPoolExecutor to run code in multiple processes if we’re willing to abandon all shared state.

When executing code in a process pool, any state that was available at the start is available to the subprocesses. However, there is no coordination between the two. Data can only be passed back to the controlling process as the return value of the tasks submitted to the pool. No changes to global variables are reflected in any way.

Although multiple independent Python processes are not bound by the same one-at-a-time execution method imposed by the GIL, they also have significant overheads. For IO-bound tasks (i.e., tasks that spend most of their time waiting and therefore not holding the GIL), a process pool is generally slower than a thread pool.

On the other hand, tasks that involve large amounts of computation are well suited to being delegated to a subprocess, especially long-running ones where the overhead of the setup is lessened compared to the savings of parallel execution.

Making our code multithreaded

The function that we want to parallelize is get_data_points(...); the functions that implement the command line and database connections do not significantly change when dealing with 1 or 500 sensors; there is no particular reason to split its work out into threads. Keeping this work in the main thread makes it easier to handle errors and report on progress, so we rewrite the add_data_from_sensors(...) function only.

Implementation of add_data_from_sensors that uses ThreadPoolExecutor
def add_data_from_sensors(
    session: Session, servers: t.Tuple[str], api_key: t.Optional[str]
) -> t.List[DataPoint]:
    threads: t.List[Future] = []
    points: t.List[DataPoint] = []
    with ThreadPoolExecutor() as pool:
        for server in servers:
            points_future = pool.submit(get_data_points, server, api_key)
            threads.append(points_future)
        for points_future in threads:
            points += handle_result(points_future, session)
    return points
def handle_result(execution: Future, session: Session) -> t.List[DataPoint]:
    points: t.List[DataPoint] = []
    result = execution.result()
    for point in result:
        session.add(point)
        points.append(point)
    return points

As we will submit all our jobs to the ThreadPoolExecutor before the first time that we call a result() method, they will all be queued up for simultaneous execution in threads. It’s the result() method and the end of the with block that triggers blocking; submitting jobs does not cause the program to block, even if you submit more jobs than can be processed simultaneously.

This method is much less intrusive to the program flow than either the raw threaded approach or the nonblocking IO approaches, but it does still involve changing the execution flow to handle the fact that these functions are now working with Future objects rather than the data directly.

AsyncIO

AsyncIO is the elephant in the room when talking about Python concurrency, thanks mainly to the fact that it’s one of the flagship features of Python 3. It is a language feature that allows for something that works like the nonblocking IO example but with a somewhat similar API to the ThreadPoolExecutor. The API isn’t precisely the same, but the underlying concept of submitting tasks and being able to block to wait for their results is shared between the two.

Asyncio code is cooperatively multitasked. That is, code is never interrupted to allow another function to execute; the switching only occurs when a function blocks. This change makes it easier to reason about how code will behave, as there’s no chance of a simple statement like num += 1 being interrupted.

There are two new keywords that you often see when working with asyncio, the async and await keywords. The async keyword marks certain control flow blocks (specifically, def, for, and with) as using the asyncio flow, rather than the standard flow. The meanings of these blocks are still the same as in standard, synchronous Python, but the underlying code paths can be quite different.

The equivalent of the ThreadPoolExecutor itself is the event loop. When executing asynchronous code, an event loop object is responsible for keeping track of all the tasks to be executed and coordinating passing their return values back to the calling code.

There is a strict separation between code intended to be called from a synchronous context and an asynchronous one. If you accidentally call async code from synchronous contexts, you’ll find yourself with coroutine objects rather than the data types you’re expecting, and if you call synchronous code from an async context, you can inadvertently introduce blocking IO that causes performance problems.

To enforce this separation, and to allow API authors optionally to support both synchronous and asynchronous uses of their objects, the async modifier is added to for and with to specify that you’re using the async-compatible implementation. These variants cannot be used in a synchronous context or on objects that do not have an asynchronous implementation (such as tuples or lists, in the case of async for).

async def

We can define new coroutines in the same way that we define functions. However, the def keyword becomes async def . These coroutines return values like any other. As such, we can implement the same behavior from Listing 7-3 in an asyncio method, as shown in Listing 7-11.
import asyncio
async def increment():
    return 1
async def decrement():
    return -1
async def onehundred():
    num = 0
    for i in range(100):
        num += await increment()
        num += await decrement()
    return num
if __name__ == "__main__":
    asyncio.run(onehundred())
Listing 7-11

Example of concurrent increment and decrement coroutines

This behaves in the same way: two coroutines are run, their values are retrieved, and the num variable is adjusted according to the result of those functions. The main difference is that instead of these coroutines being submitted to a thread pool, the onehundred() async function is passed to the event loop to run, and that function is responsible for calling the other coroutines that do the work.

When we call a function that is defined as asynchronous, we receive a coroutine object as the result, rather than having the function execute.
async def hello_world():
    return "hello world"
>>> hello_world()
<coroutine object hello_world at 0x03DEDED0>

The asyncio.run(...) function is the main entrypoint for asynchronous code. It blocks until the passed function, and all other functions which that function schedules, are complete. The upshot is that only one coroutine at a time can be initiated by synchronous code.

await

The await keyword is the trigger for blocking until an asynchronous function has completed. However, this only blocks the current asynchronous call stack. You can have multiple asynchronous functions executing at once, in which case another function is executed while waiting for the result.

The await keyword is equivalent to the Future.result() method in the ThreadPoolExecutor example: it transforms an awaitable object into its result. It can appear wherever an asynchronous function call is used; it’s equally valid to write any of the three variants of printing the result of a function shown in Figure 7-8 .
../images/481001_1_En_7_Chapter/481001_1_En_7_Fig8_HTML.png
Figure 7-8

Three equivalent uses of the await keyword

Once await has been used, the underlying awaitable is consumed. It is not possible to write
data = get_data()
if await data:
    print(await data)

An awaitable object is an object that implements the __await__() method . This is an implementation detail; you won’t need to write an __await__() method. Instead, you will use a variety of different built-in objects that provide it for you. For example, any coroutines defined using async def have an __await__() method.

Aside from coroutines, the other common awaitable is Task, which can be created from a coroutine with the asyncio.create_task(...) function. The normal usage is that one function is called with asyncio.run(...) and that function schedules further functions with asyncio.create_task(...).
async def example():
    task = asyncio.create_task(hello_world())
    print(task)
    print(hasattr(task, "__await__"))
    return await task
>>> asyncio.run(example())
<Task pending coro=<hello_world() running at <stdin>:1>>
True
'hello world'
A task is a coroutine that has been scheduled for parallel execution. When you await a coroutine, you cause it to be scheduled for execution, then immediately block waiting for its result. The create_task(...) function allows you to schedule a task before you need its result. If you have multiple operations to perform, each of which performs some blocking IO, but you await the coroutines directly, then one won’t be scheduled until the previous is complete. Scheduling the coroutines as tasks first allows them to run in parallel, as demonstrated by Table 7-3.
Table 7-3

Comparison of tasks and bare coroutines for parallel waiting

Awaiting coroutines directly

import asyncio

import time

async def slow():

    start = time.time()

    await asyncio.sleep(1)

    await asyncio.sleep(1)

    await asyncio.sleep(1)

    end = time.time()

    print(end - start)

>>> asyncio.run(slow())

3.0392887592315674

Converting to tasks first

import asyncio

import time

async def slow():

    start = time.time()

    first = asyncio.create_task(asyncio.sleep(1))

    second = asyncio.create_task(asyncio.sleep(1))

    third = asyncio.create_task(asyncio.sleep(1))

    await first

    await second

    await third

    end = time.time()

    print(end - start)

>>> asyncio.run(slow())

1.0060641765594482

There are some useful convenience functions to handle scheduling tasks based on coroutines, most notably asyncio.gather(...) . This method takes any number of awaitable objects, schedules them as tasks, awaits them all, and returns an awaitable of a tuple of their return values in the same order that their coroutines/tasks were originally given in.

This is very useful for when multiple awaitables should be run in parallel:
async def slow():
    start = time.time()
    await asyncio.gather(
        asyncio.sleep(1),
        asyncio.sleep(1),
        asyncio.sleep(1)
    )
    end = time.time()
    print(end - start)
>>> asyncio.run(slow())
1.0132906436920166

async for

The async for construct allows iterating over an object where the iterator itself is defined by asynchronous code. It is not correct to use async for on synchronous iterators that are merely being used in an asynchronous context or that happen to contain awaitables.

None of the common data types we’ve used are asynchronous iterators. If you have a tuple or a list, then you use the standard for loop, regardless of what they contain or if they’re being used in synchronous or asynchronous code.

This section contains examples of three different approaches to looping in an asynchronous function. Type hinting is especially useful here, as the data types here are subtly different, and it makes it clear which types each function expects.

Listing 7-12 demonstrates an iterable of awaitables. It contains two asynchronous functions: one coroutine that returns a number11 and one adds up the contents of an iterable of awaitables. That is, the add_all(...) function expects a standard iterable of coroutines (or tasks) from number(...). The numbers() function is synchronous; it returns a standard list containing two invocations of number(...).
import asyncio
import typing as t
async def number(num: int) -> int:
    return num
def numbers() -> t.Iterable[t.Awaitable[int]]:
    return [number(2), number(3)]
async def add_all(numbers: t.Iterable[t.Awaitable[int]]) -> int:
    total = 0
    for num in numbers:
        total += await num
    return total
if __name__ == "__main__":
    to_add = numbers()
    result = asyncio.run(add_all(to_add))
    print(result)
Listing 7-12

Looping over a list of awaitables

In the add_all(...) function, the loop is a standard for loop, as it’s iterating over a list. The contents of the list are the result of number(2) and number(3), so these two calls need to be awaited to retrieve their respective results.

Another way of writing this is to invert the relationship between the iterable and the awaitable. That is, instead of a list of awaitables of ints, pass an awaitable of a list of ints. Here, numbers() is defined as a coroutine, and it returns a list of integers.
import asyncio
import typing as t
async def number(num: int) -> int:
    return num
async def numbers() -> t.Iterable[int]:
    return [await number(2), await number(3)]
async def add_all(nums: t.Awaitable[t.Iterable[int]]) -> int:
    total = 0
    for num in await nums:
        total += num
    return total
if __name__ == "__main__":
    to_add = numbers()
    result = asyncio.run(add_all(to_add))
    print(result)
Listing 7-13

Awaiting a list of integers

The numbers() coroutine is now responsible for awaiting the individual number(...) coroutines. We still use a standard for loop, but now instead of awaiting the contents of the for loop, we await the value we’re looping over.

With both approaches, the first number(...) call is awaited before the second, but with the first approach, control passes back to the add_all(...) function between the two. In the second, control is only passed back after all numbers have been awaited individually and assembled into a list. With the first method, each number(...) coroutine is processed as it’s needed, but with the second all processing of the number(...) calls happens before the first value is used.

The third way of approaching this involves using async for. To do this, we convert the numbers() coroutine from Listing 7-13 to a generator function, resulting in the code in Listing 7-14. This is the same approach as used in synchronous Python code to avoid high memory usage, with the same trade-off that the value can only be iterated over once.
import asyncio
import typing as t
async def number(num: int) -> int:
    return num
async def numbers() -> t.AsyncIterator[int]:
    yield await number(2)
    yield await number(3)
async def add_all(nums: t.AsyncIterator[int]) -> int:
    total = 0
    async for num in nums:
        total += num
    return total
if __name__ == "__main__":
    to_add = numbers()
    result = asyncio.run(add_all(to_add))
    print(result)
Listing 7-14

Asynchronous generator

We still need the await keywords in the numbers() method as we want to iterate over the results of the number(...) method, not over placeholders for the results. Like the second version, this hides the details of awaiting the individual number(...) calls from the sum(...) function, instead of trusting the iterator to manage it. However, it also retains the property of the first that each number(...) call is only evaluated when it’s needed: they’re not all processed in advance.

For an object to support being iterated over with for, it must implement an __iter__ method that returns an iterator. An iterator is an object that implements both an __iter__ method (that returns itself) and a __next__ method for advancing the iterator. An object that implements __iter__ but not __next__ is not an iterator but an iterable. Iterables can be iterated over; iterators are also aware of their current state.

Equally, an object that implements an asynchronous method __aiter__ is an AsyncIterable. If __aiter__ returns self and it also provides an __anext__ asynchronous method, it’s an AsyncIterator.

A single object can implement all four methods to support both synchronous and asynchronous iterations. This is only relevant if you’re implementing a class that can behave as an iterable, either synchronous or asynchronous. The easiest way to create an async iterable is using the yield construct from an async function, and that’s enough for most use cases.

In all of the preceding examples, we’re using coroutines directly. As the functions specify they work on typing.Awaitable, we can be sure that the same code would work if we passed tasks rather than coroutines. The second example, where we are awaiting a list, is equivalent to using the built-in asyncio.gather(...) function . Both return an awaitable of an iterable of results. As such, this may be the method that you’ll see most often, albeit expressed as shown in Listing 7-15.
import asyncio
import typing as t
async def number(num: int) -> int:
    return num
async def numbers() -> t.Iterable[int]:
    return await asyncio.gather(
        number(2),
        number(3)
    )
async def add_all(nums: t.Awaitable[t.Iterable[int]]) -> int:
    total = 0
    for num in await nums:
        total += num
    return total
if __name__ == "__main__":
    to_add = numbers()
    result = asyncio.run(add_all(to_add))
    print(result)
Listing 7-15

Using gather to process tasks in parallel

async with

The with statement also has an async counterpart, async with, which is used to facilitate the writing of context managers that depend on asynchronous code. It’s quite common to see this in asynchronous code, as many IO operations involve setup and teardown phases.

In the same way that async for uses __aiter__ rather than __iter__, asynchronous context managers define the __aenter__ and __aexit__ methods to replace __enter__ and __exit__. Once again, objects can choose to implement all four to work in both contexts, if appropriate.

When using synchronous context managers in an asynchronous function, there’s a potential for blocking IO to happen before the first line and after the last line of the body. Using async with and a compatible context manager allows for the event loop to schedule some other asynchronous code during that blocking IO period.

We will cover using and creating context managers in more detail over the next two chapters, but both are equivalent to try/finally constructions, but standard context managers use synchronous code in their enter and exit methods, whereas async context managers use asynchronous code.

Async locking primitives

Although asynchronous code is less vulnerable to concurrency safety issues than threads are, it is still possible to write asynchronous code that has concurrency bugs. The switching model being based on awaiting a result rather than threads being interrupted prevents most accidental bugs, but it’s no guarantee of correctness.

For example, in Listing 7-16 we have an asyncio version of the increment example from when we looked at threads. This has an await within the num += line and introduces an offset() coroutine to return the 1 that will be added to num. This offset() function also uses asyncio.sleep(0) to block for a fraction of a second, which simulates the behavior of a blocking IO request.
import asyncio
import random
num = 0
async def offset():
    await asyncio.sleep(0)
    return 1
async def increment():
    global num
    num += await offset()
async def onehundred():
    tasks = []
    for i in range(100):
        tasks.append(increment())
    await asyncio.gather(*tasks)
    return num
if __name__ == "__main__":
    print(asyncio.run(onehundred()))
Listing 7-16

Example of an unsafe asynchronous program

Although this program should print 100, it may print any number as low as 1, depending on the decisions the event loop makes about scheduling tasks. To prevent this, we need to either move the await offset() call to not be part of the += construction or lock the num variable.

AsyncIO provides direct equivalents of Lock, Event, Condition, and Semaphore from the threading library. These variants use asynchronous versions of the same API, so we can fix the event function as shown in Listing 7-17 .
import asyncio
import random
num = 0
async def offset():
    await asyncio.sleep(0)
    return 1
async def increment(numlock):
    global num
    async with numlock:
        num += await offset()
async def onehundred():
    tasks = []
    numlock = asyncio.Lock()
    for i in range(100):
        tasks.append(increment(numlock))
    await asyncio.gather(*tasks)
    return num
if __name__ == "__main__":
    print(asyncio.run(onehundred()))
Listing 7-17

Example of asynchronous locking

Perhaps the biggest difference between threaded and async versions of synchronization primitives is that async primitives cannot be defined at the global scope. More accurately, they can only be instantiated from within a running coroutine as they must register themselves with the current event loop.

Working with synchronous libraries

The code we’ve written so far relies on us having an entirely asynchronous stack of libraries and functions to call from our async code. If we introduce some synchronous code, then we block all our tasks while executing it. We can demonstrate this using the time.sleep(...) method to block for a set amount of time. Earlier we used asyncio.sleep(...) to model a long-running async-aware task; mixing these lets us look at the performance of such a mixed system:
import asyncio
import time
async def synchronous_task():
    time.sleep(1)
async def slow():
    start = time.time()
    await asyncio.gather(
        asyncio.sleep(1),
        asyncio.sleep(1),
        synchronous_task(),
        asyncio.sleep(1)
    )
    end = time.time()
    print(end - start)
>>> asyncio.run(slow())
2.006387243270874
In this case, our three asynchronous tasks all take 1 second and are processed in parallel. The blocking task also takes 1 second but is processed in series, meaning the total time taken is 2 seconds. To ensure that all four functions run in parallel, we can use the loop.run_in_executor(...) function . This allocates a ThreadPoolExecutor (or another executor of your choice) and runs specified tasks in that context rather than in the main thread.
import asyncio
import time
async def synchronous_task():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, time.sleep, 1)
async def slow():
    start = time.time()
    await asyncio.gather(
        asyncio.sleep(1),
        asyncio.sleep(1),
        synchronous_task(),
        asyncio.sleep(1)
    )
    end = time.time()
    print(end - start)
>>> asyncio.run(slow())
1.0059468746185303

The run_in_executor(...) function works by switching out the problem to one that is easily made asynchronous. Instead of trying to turn arbitrary Python functions from synchronous into asynchronous, finding the right places to yield control back to the event loop, getting woken up at the correct time, and so on, it uses a thread (or a process) to execute the code. Threads and processes are inherently suitable to asynchronous control by virtue of being an operating system construct. This reduces the scope of what needs to be made compatible with the asyncio system to starting a thread and waiting for it to be complete.

Making our code asynchronous

The first step in making our code work in an asynchronous context is to pick a function to act as the first in the chain of asynchronous functions. We want to keep the synchronous and asynchronous code separate, so we need to pick something that’s high enough in the call stack that all things that need to be asynchronous are (perhaps indirectly) called by this function.

In our code, the get_data_points(...) function is the only one that we want to run in an asynchronous context. It is called by add_data_from_sensors(...), which is itself called by standalone(...), which is called by collect_sensor_data(...) in turn. Any of these four functions can be the argument to asyncio.run(...).

The collect_sensor_data(...) function is the click entrypoint, so it cannot be an asynchronous function. The get_data_points(...) function needs to be called multiple times, so it is a better fit for a coroutine than the main entrypoint into the asynchronous flow. This leaves standalone(...) and add_data_from_sensors(...).

The standalone(...) function does the setup for the database already; it is a good place to do the event loop setup too. Therefore, we need to make the add_data_from_sensors(...) an async function and adjust how it is called from standalone(...).
def standalone(
    db_uri: str, servers: t.Tuple[str], api_key: t.Optional[str], echo: bool = False
) -> None:
    engine = create_engine(db_uri, echo=echo)
    sm = sessionmaker(engine)
    Session = sm()
    asyncio.run(add_data_from_sensors(Session, servers, api_key))
    Session.commit()

We now need to change our implementations of the lower-level functions to not call any blocking synchronous code. Currently, we are making our HTTP calls using the requests library, which is a blocking, synchronous library.

As an alternative, we’ll switch to the aiohttp module to make our HTTP requests. Aiohttp is a natively asynchronous HTTP library that supports both client and server applications. The interface is not as refined as that of requests, but it is quite usable.

The biggest difference in API is that HTTP requests involve many context managers, as follows:
    async with aiohttp.ClientSession() as http:
        async with http.get(url) as request:
            result = await request.json()

As the name suggests, a ClientSession represents the idea of a session with a shared cookie state and HTTP header configuration. Within this, requests are made with asynchronous context managers like get. The result of the context manager is an object which has methods that can be awaited to retrieve the contents of the response.

The preceding construction, which is admittedly much more verbose than the equivalent using requests, allows for many places where the execution flow could be yielded to work around blocking IO. The obvious one is the await line, which relinquishes control while waiting for the response to be retrieved and parsed as JSON. Less obvious is the entry and exit of the http.get(...) context manager, which can set up socket connections, allowing things like DNS resolution not to block execution. It’s also possible for the execution flow to be yielded when entering and exiting a ClientSession.

All this is to say that while the preceding construction is more verbose than the same code using requests, it does allow for transparently setting up and tearing down of shared resources relating to the HTTP session and is doing so in a way that does not significantly slow the process.

In our add_data_from_sensors(...) function, we need to handle the fact that this session object is now required, preferably in a way that shares the client session between our multiple requests. We also need to keep a record of the request coroutine calls, so we can schedule them in parallel and retrieve their data.
async def add_data_from_sensors(
    session: Session, servers: t.Tuple[str], api_key: t.Optional[str]
) -> t.List[DataPoint]:
    todo: t.List[t.Awaitable[t.List[DataPoint]]] = []
    points: t.List[DataPoint] = []
    async with aiohttp.ClientSession() as http:
        for server in servers:
            todo.append(get_data_points(server, api_key, http))
        for a in await asyncio.gather(*todo):
            points += await handle_result(a, session)
    return points

In this function, we define two variables, a list of awaitables that each return a list of DataPoint objects, as well as a list of DataPoint objects that we fill as we process the awaitables. Then, we set up the ClientSession and iterate over the servers, adding an invocation of get_data_points(...) for each server. At this stage, these are coroutines as they are not scheduled as a task. We could await them in turn, but this would have the effect of making each request happen sequentially. Instead, we use asyncio.gather(...) to schedule them as tasks and allow us to iterate over the results, which are each a list of DataPoint objects.

Next, we need to add the data to the database. We’re using SQLAlchemy here, which is a synchronous library. For production-quality code, we’d need to ensure that there is no chance of blocking here. The following implementation does not guarantee that the session.add(...) method can block due to the data being synchronized with the database session.

A placeholder for handle_result that should not be used in production code
async def handle_result(result: t.List[DataPoint], session: Session) -> t.List[DataPoint]:
    for point in result:
        session.add(point)
    return result

We will look at methods for dealing with database integration in a parallel execution context in the next chapter, but this is good enough for a prototype.

Finally, we need to do the actual work of getting the data. The method is greatly different to the synchronous version, except that it also requires the ClientSession to be passed in, and some minor changes must be made to accommodate the difference in HTTP request API.

Implementation of get_data_points using aiohttp
async def get_data_points(server: str, api_key: t.Optional[str], http: aiohttp.ClientSession) -> t.List[DataPoint]:
    if not server.endswith("/"):
        server += "/"
    url = server + "v/2.0/sensors/"
    headers = {}
    if api_key:
        headers["X-API-KEY"] = api_key
    async with http.get(url) as request:
        result = await request.json()
        ok = request.status == 200
    now = datetime.datetime.now()
    if ok:
        points = []
        for value in result["sensors"]:
            points.append(
                DataPoint(
                    sensor_name=value["id"], collected_at=now, data=value["value"]
                )
            )
        return points
    else:
        raise ValueError(
            f"Error loading data from {server}: "
            + result.json().get("error", "Unknown")
        )

This method makes many different choices when compared to a multithreaded or a multiprocess model. A multiprocess model allows for true concurrent processing, and a multithreaded approach can achieve some very minor performance gains thanks to the less restrictive guarantees about switching, but asynchronous code has a much more natural interface, in my opinion.

The key disadvantage of the asyncio approach is that the advantages can only be truly realized with asynchronous libraries. Other libraries can still be used by combining asyncio and threaded approaches, which is made easy by the good integration between these two methods, but there is a significant refactoring requirement to converting existing code to an asynchronous approach and, equally, a significant learning curve in becoming accustomed to writing asynchronous code in the first place.

Comparison

There are implementations of all four approaches in the code accompanying this chapter, so it’s possible for us to run a simple benchmark to compare their speeds. Benchmarking a proposed optimization in this way is always difficult; it’s hard to get real numbers from anything but a real-world test, so the following should be taken with a pinch of salt.

These numbers were generated by extracting data from the same sensor multiple times in a single invocation. Aside from the other load on the machine timing these invocations, the numbers are unrealistic because they do not involve looking up connection information for many different targets and because the server returning the requested data is limited in the number of simultaneous requests that it can service.

As you can see from Figure 7-9, the threaded and asyncio approaches are almost indistinguishable in terms of time taken. The nonblocking IO method that we rejected due to its complexity is also comparable. A multiprocess approach is noticeably slower, but similar to the other three approaches. The standard, synchronous approach behaves similarly with only one or two sensors to collect data from, but the larger result sets quickly become pathological, taking an order of magnitude longer than the concurrent approaches.

The information we should take from this is that this workload is well suited to parallelization. The fact that asyncio is as much as 20% faster in our benchmark does not necessarily equate to it being a faster technology, just faster in this particular test. Future changes to the codebase, as well as different testing conditions, could easily change the relationship between the technologies.
../images/481001_1_En_7_Chapter/481001_1_En_7_Fig9_HTML.png
Figure 7-9

Time taken to load data from 1, 2, 5, 10, 20, or 50 HTTP APIs, using different parallelization methods

Making a choice

There are two pernicious falsehoods about asyncio circulating in the Python community at the time of writing. The first is that asyncio has "won" at concurrency. The second is that it’s bad and should not be used. It should come as no surprise that the truth lies somewhere in the middle. Asyncio is brilliant for network clients that are largely IO-bound but isn’t a panacea.

When deciding between the different approaches, the first question to ask yourself is if your code is spending most of its time waiting for IO or if it’s spending most of its time processing data. A task that waits for a short while and then does a lot of calculation is not a great fit for asyncio as it can parallelize the waiting but not the execution, leaving a backlog of CPU-bound tasks to perform. Equally, it’s not a natural fit for a thread pool, as the GIL prevents the various threads running truly in parallel. A multiprocess deployment has higher overheads but is able to take advantage of true parallelization in CPU-bound code.

If the task does spend more time waiting than executing code, it’s likely that asyncio or a thread-based parallelization approach will be the best choice. As a rule of thumb, I recommend preferring asyncio for applications that call out to servers but do not wait for network requests themselves, and combinations of process and thread pools for applications that do accept inbound connections.12 A decision tree representing this is given as Figure 7-10.
../images/481001_1_En_7_Chapter/481001_1_En_7_Fig10_HTML.jpg
Figure 7-10

Decision tree for parallelization methods in client/server applications

This is not a hard rule; there are too many exceptions to list, and you should consider the details of your application and test your assumptions, but in general, I prefer the robust and predictable behavior of preemptive multitasking13 for a server application.

Our sensor API endpoints are entirely standard Python, but are run through the waitress WSGI server. The WSGI server makes the concurrency decision for us, with waitress-serve instantiating a four-thread thread pool to handle inbound requests.

The collector process involves a large amount of waiting in every invocation and is entirely client side, so using asyncio to implement its concurrent behaviors is a good fit.

Summary

In this chapter, we’ve looked at the two most common types of parallelization, threading and asyncio, as well as other methods that are less widely used. Concurrency is a difficult topic, and we’ve not finished covering the things that you can achieve with asyncio, but we will be leaving threads behind at this point.

Asynchronous programming is a very powerful tool, one that all Python programmers should be aware of, but the trade-offs of threads and asyncio are very different, and generally speaking, only one will be useful in any given program.

If you need to write a program that relies on concurrency in Python, I strongly recommend experimenting with the different approaches to find which matches your problem best. I also would encourage you to make sure you understand the uses of all the synchronization primitives we’ve used in this chapter, as appropriate use of locks can make the difference between a program that’s slow and hard to understand and a program that’s fast and intuitively written.

Additional resources

The following links contain some useful background information on topics that I’ve covered in this chapter, as well as some other, less common approaches:
  • The HTTP zine by Julia Evans gives a good explanation of the internals of the HTTP protocol and the differences between versions: https://wizardzines.com/zines/http/.

  • Greenlets are a precursor to native coroutines in Python, which may be of use to people who need to use very old versions of Python: https://greenlet.readthedocs.io/en/latest/.

  • Similarly, https://github.com/stackless-dev/stackless/wiki covers Stackless Python, which is a variant of Python intended to offer better performance when running many small operations in parallel. Greenlets are derived from the stackless project.

  • Just as ThreadPools are backed by threads, ProcessPools are backed by processes. Information on Python’s lower-level process management functionality is at https://docs.python.org/3/library/multiprocessing.html.

  • The slides for David Beazley’s excellent presentation “Understanding the GIL” are available at www.dabeaz.com/GIL/. Although some minor details have changed in the ten years since it was written (such as the concept of “ticks”), the overall description is still very accurate and worth a read.

  • Information on the PyPy implementation of Python can be found at www.pypy.org/.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.39.252