Thread and process pools

As mentioned before, pools are structures designed to hold N objects (threads, processes, and so on). When the usage reaches capacity, no work is assigned to a thread (or process) until one of those currently working becomes available again. Pools, therefore, are a great way to limit the number of threads (or processes) that can be alive at the same time, preventing the system from starving due to resource exhaustion, or the computation time from being affected by too much context switching.

In the following examples, I will be tapping into the concurrent.futures module to use the ThreadPoolExecutor and ProcessPoolExecutor executors. These two classes, use a pool of threads (and processes, respectively), to execute calls asynchronously. They both accept a parameter, max_workers, which sets the upper limit to how many threads (or processes) can be used at the same time by the executor.

Let's start from the multithreaded example:

# pool.py
from concurrent.futures import ThreadPoolExecutor, as_completed
from random import randint
import threading

def run(name):
value = randint(0, 10**2)
tname = threading.current_thread().name
print(f'Hi, I am {name} ({tname}) and my value is {value}')
return (name, value)

with ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(run, f'T{name}') for name in range(5)
]
for future in as_completed(futures):
name, value = future.result()
print(f'Thread {name} returned {value}')

After importing the necessary bits, we define the run function. It gets a random value, prints it, and returns it, along with the name argument it was called with. The interesting bit comes right after the function.

As you can see, we're using a context manager to call ThreadPoolExecutor, to which we pass max_workers=3, which means the pool size is 3. This means only three threads at any time will be alive.

We define a list of future objects by making a list comprehension, in which we call submit on our executor object. We instruct the executor to run the run function, with a name that will go from T0 to T4. A future is an object that encapsulates the asynchronous execution of a callable.

Then we loop over the future objects, as they are are done. To do this, we use as_completed to get an iterator of the future instances that returns them as soon as they complete (finish or were cancelled). We grab the result of each future by calling the homonymous method, and simply print it. Given that run returns a tuple name, value, we expect the result to be a two-tuple containing name and value. If we print the output of a run (bear in mind each run can potentially be slightly different), we get:

$ python pool.py
Hi, I am T0 (ThreadPoolExecutor-0_0) and my value is 5
Hi, I am T1 (ThreadPoolExecutor-0_0) and my value is 23
Hi, I am T2 (ThreadPoolExecutor-0_1) and my value is 58
Thread T1 returned 23
Thread T0 returned 5
Hi, I am T3 (ThreadPoolExecutor-0_0) and my value is 93
Hi, I am T4 (ThreadPoolExecutor-0_1) and my value is 62
Thread T2 returned 58
Thread T3 returned 93
Thread T4 returned 62

Before reading on, can you tell why the output looks like this? Could you explain what happened? Spend a moment thinking about it.

So, what goes on is that three threads start running, so we get three Hi, I am... messages printed out. Once all three of them are running, the pool is at capacity, so we need to wait for at least one thread to complete before anything else can happen. In the example run, T0 and T2 complete (which is signaled by the printing of what they returned), so they return to the pool and can be used again. They get run with names T3 and T4, and finally all three, T1T3, and T4 complete. You can see from the output how the threads are actually reused, and how the first two are reassigned to T3 and T4 after they complete.

Let's now see the same example, but with the multiprocess design:

# pool_proc.py
from concurrent.futures import ProcessPoolExecutor, as_completed
from random import randint
from time import sleep

def run(name):
sleep(.05)
value = randint(0, 10**2)
print(f'Hi, I am {name} and my value is {value}')
return (name, value)

with ProcessPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(run, f'P{name}') for name in range(5)
]
for future in as_completed(futures):
name, value = future.result()
print(f'Process {name} returned {value}')

The difference is truly minimal. We use ProcessPoolExecutor this time, and the run function is exactly the same, with one small addition: we sleep for 50 milliseconds at the beginning of each run. This is to exacerbate the behavior and have the output clearly show the size of the pool, which is still three. If we run the example, we get:

$ python pool_proc.py
Hi, I am P0 and my value is 19
Hi, I am P1 and my value is 97
Hi, I am P2 and my value is 74
Process P0 returned 19
Process P1 returned 97
Process P2 returned 74
Hi, I am P3 and my value is 80
Hi, I am P4 and my value is 68
Process P3 returned 80
Process P4 returned 68

This output clearly shows the pool size being three. It is very interesting to notice that if we remove that call to sleep, most of the time the output will have five prints of Hi, I am..., followed by five prints of Process Px returned.... How can we explain that? Well it's simple. By the time the first three processes are done, and returned by as_completed, all three are asked for their result, and whatever is returned, is printed. While this happens, the executor can already start recycling two processes to run the final two tasks, and they happen to print their Hi, I am... messages, before the prints in the for loop are allowed to take place.

This basically means ProcessPoolExecutor is quite fast and aggressive (in terms of getting the scheduler's attention), and it's worth noting that this behavior doesn't happen with the thread counterpart, in which, if you recall, we didn't need to use any artificial sleeping.

The important thing to keep in mind though, is being able to appreciate that even simple examples such as these can already be slightly tricky to understand or explain. Let this be a lesson to you, so that you raise your attention to 110% when you code for multithreaded or multiprocess designs.

Let's now move on to a more interesting example.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.52.203