Using thread pools for handling incoming connections

As we saw in the previous section, we do not need an infinite number of threads to handle the incoming clients. We can manage with a limited number of threads to handle a large number of clients. But, how do we implement this thread pooling in our application. As it turns out, it is quite easy to implement the thread pool functionality with Python 3 and the concurrent.futures module.

The following code sample modifies our existing TCP server example to use a thread pool, instead of arbitrarily launching an infinite number of threads to handle the incoming client connections:

# simple_socket_threadpool.py
#!/usr/bin/python3
from concurrent.futures import ThreadPoolExecutor
import socket


# Let's first create a TCP type Server for handling clients
class Server(object):
"""A simple TCP Server."""

def __init__(self, hostname, port, pool_size):
"""Server initializer

Keyword arguments:
hostname -- The hostname to use for the server
port -- The port on which the server should bind
pool_size -- The pool size to use for the threading executor
"""

# Setup thread pool size
self.executor_pool = ThreadPoolExecutor(max_workers=pool_size)

# Setup the TCP socket server
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.hostname = hostname
self.port = port
self.bind_connection()
self.listen()

def bind_connection(self):
"""Bind the server to the host."""

self.server.bind((self.hostname, self.port))

def listen(self):
"""Start listening for the incoming connections."""

self.server.listen(10) # Queue a maximum of 10 clients
# Enter the listening loop
while True:
client, client_addr = self.server.accept()
print("Received a connection from %s" % str(client_addr))
self.executor_pool.submit(self.handle_client, client)

def handle_client(self, client):
"""Handle incoming client connection.

Keyword arguments:
client -- The client connection socket
"""

print("Accepted a client connection")
while True:
buff = client.recv(1024).decode()
if not buff:
break
print(buff)
print("Client closed the connection")
client.close() # We are done now, let's close the connection

if __name__ == '__main__':
server = Server('localhost', 7000, 20)

In this example, we modified our TCP server code to utilize a thread pool instead of launching an arbitrary number of threads. Let's take a look at how we made it possible.

First, to utilize the thread pool, we need to initialize an instance of the thread pool executor. Under the __init__ method of the Server class, we first initialize the thread pool executor by calling its constructor:

self.executor_pool = ThreadPoolExecutor(max_workers=pool_size)

The ThreadPoolExecutor constructor takes a max_workers parameter that defines how many concurrent threads are possible inside the ThreadPool. But, what will be an optimal value for the max_workers parameter?

A general rule of thumb will be to have max_workers = (5 x Total number of CPU cores). The reasoning behind this formula is that inside a web application, most of the threads are generally waiting for the I/O to complete, whereas a few threads are busy doing CPU-bound operations.

The next thing after we have created a ThreadPoolExecutor is to submit jobs to it so that they can be processed by the threads inside the Executor Pool. This can be achieved through the use of the submit method of the ThreadPoolExecutor class. This can be seen under the listen() method of the Server class:

self.executor_pool.submit(self.handle_client, client)

The submit() method of the ThreadPoolExecutor takes in, as the first parameter, the name of the method to execute inside a thread and the parameters that need to be passed to the executing method.

That was quite simple to implement and provides us with lots of benefits, such as:

  • Optimal usage of resources provided by the underlying infrastructure
  • Ability to handle multiple requests
  • Increased scalability and reduced wait times for the clients
One important thing to take a note of here is, since the ThreadPoolExecutor utilizes the threads, the CPython implementation might not provide the maximum performance due to the presence of GIL, which doesn't allow the execution of more than one thread at a time. Hence, the performance of the application may vary depending upon the underlying Python implementation being used.

Now, the question that arises is, what if we wanted to sidestep the Global Interpreter Lock? Is there some mechanism while still using the CPython implementation of Python? We discussed this scenario in the previous chapter and settled with the use of Python's multiprocessing module in place of the threading library.

Also, as it turns out, using a ProcessPoolExecutor is quite a simple feat to achieve. The underlying implementation inside the concurrent.futures package takes care of most of the necessities and provides the programmer with a simple-to-use abstraction. To see this in action, let's modify our previous example to swap in ProcessPoolExecutor in place of the ThreadPoolExecutor. To do this, all we need to do is first import the correct implementation from the concurrent.futures package as described by the following line:

from concurrent.futures import ProcessPoolExecutor

The next thing we need to do is to modify our __init__ method to create a process pool instead of a thread pool. The following implementation of the __init__ method shows how we can achieve this:

def __init__(self, hostname, port, pool_size):
"""Server initializer

Keyword arguments:
hostname -- The hostname to use for the server
port -- The port on which the server should bind
pool_size -- The size of the pool to use for the process based executor
"""

# Setup process pool size
self.executor_pool = ProcessPoolExecutor(max_workers=pool_size)

# Setup the TCP socket server
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.hostname = hostname
self.port = port
self.bind_connection()
self.listen()

Indeed, that was a simple process to carry out and now our application can use the multiprocess model instead of the multithread model.

But, can we keep the pool size the same or does it also need to change?

Every process has its own memory space and internal pointers that it needs to maintain, which makes the process heavier in comparison to the use of threads for achieving concurrency. This provides a reason to reduce the pool size so as to allow for the heavier usage of the underlying system resources. As a general rule, for a ProcessPoolExecutor, the max_workers can be calculated by the formula max_workers = (2 x Number of CPU Cores + 1).

The reasoning behind this formula can be attributed to the fact that, at any given time, we can assume that half of the processes will be busy in performing network I/O while the others might be busy doing CPU-intensive tasks.

So, now we have a fair enough idea about how we can use a resource pool and why it is a better approach in comparison to launching an arbitrary number of threads. But, this approach still requires a lot of context switches and is also highly dependent upon the underlying Python implementation being used. But there should be something better than this, for sure.

With this in mind, let's try to venture into another territory in the kingdom of Python, the territory of asynchronous programming.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.136.17.12