A multithreaded chat server

So let's put this to use and write our chat server. Make a new file called 2.1-chat_server-multithread.py and put the following code in it:

import threading, queue
import tincanchat

HOST = tincanchat.HOST
PORT = tincanchat.PORT

send_queues = {}
lock = threading.Lock()

def handle_client_recv(sock, addr):
    """ Receive messages from client and broadcast them to
        other clients until client disconnects """
    rest = bytes()
    while True:
        try:
            (msgs, rest) = tincanchat.recv_msgs(sock, rest)
        except (EOFError, ConnectionError):
            handle_disconnect(sock, addr)
            break
        for msg in msgs:
            msg = '{}: {}'.format(addr, msg)
            print(msg)
            broadcast_msg(msg)

def handle_client_send(sock, q, addr):
    """ Monitor queue for new messages, send them to client as
        they arrive """
    while True:
        msg = q.get()
        if msg == None: break
        try:
            tincanchat.send_msg(sock, msg)
        except (ConnectionError, BrokenPipe):
            handle_disconnect(sock, addr)
            break

def broadcast_msg(msg):
    """ Add message to each connected client's send queue """
    with lock:
        for q in send_queues.values():
            q.put(msg)

def handle_disconnect(sock, addr):
    """ Ensure queue is cleaned up and socket closed when a client
        disconnects """
    fd = sock.fileno()
    with lock:
        # Get send queue for this client
        q = send_queues.get(fd, None)
    # If we find a queue then this disconnect has not yet
    # been handled
    if q:
        q.put(None)
        del send_queues[fd]
        addr = sock.getpeername()
        print('Client {} disconnected'.format(addr))
        sock.close()

if __name__ == '__main__':
    listen_sock = tincanchat.create_listen_socket(HOST, PORT)
    addr = listen_sock.getsockname()
    print('Listening on {}'.format(addr))

    while True:
        client_sock,addr = listen_sock.accept()
        q = queue.Queue()
        with lock:
            send_queues[client_sock.fileno()] = q
        recv_thread = threading.Thread(target=handle_client_recv,
                                       args=[client_sock, addr],
                                       daemon=True)
        send_thread = threading.Thread(target=handle_client_send,
                                       args=[client_sock, q,
                                             addr],
                                       daemon=True)
        recv_thread.start()
        send_thread.start()
        print('Connection from {}'.format(addr))

We're now using two threads per client. One thread handles the messages received and the other thread handles the task of sending messages. The idea here is to break out each place a block might happen into its own thread. This will give us the lowest latency for each client, but it does come at the cost of system resources. We're reducing the potential number of clients that we may be able to handle simultaneously. There are other models that we could use, such as having a single thread for each client which receives messages and then sends them itself to all the connected clients, but I've chosen to optimize for latency.

To facilitate the separate threads, we've broken the receiving code and the sending code into the handle_client_recv() function and handle_client_send() function respectively.

Our handle_client_recv threads are tasked with receiving messages from the clients, and our handle_client_send threads are tasked with sending messages to the clients, but how do the received messages get from the receive threads to the send threads? This is where the queue, send_queue, dict and lock objects come in.

Queues

A Queue is a first-in first-out (FIFO) pipe. You add items to it by using the put() method, and pull them out by using the get() method. The important thing about Queue objects is that they are completely thread safe. Objects in Python are generally not thread safe unless it is explicitly specified in their documentation. Being thread safe means that operations on the object are guaranteed to be atomic, that is, they will always complete without any chance of another thread getting to that object and doing something unexpected to it.

Hang on, you might ask, earlier, didn't you say that because of the GIL the OS is running only one Python thread per process at any given moment in time? If that's so, then how could two threads perform an operation on an object simultaneously? Well, this is a fair question. Most operations in Python are, in fact, made up of many operations at the OS level, and it is at the OS level that threads are scheduled. A thread could start an operation on an object—say by appending an item to a list—and when the thread gets halfway through its OS level operations the OS could switch to another thread, which also starts appending to the same list. Since list objects provide no warranty of their behavior when abused like this by threads (they're not thread safe), anything could happen next, and it's unlikely to be a useful outcome. This situation can be called a race condition.

Thread safe objects remove this possibility, so they should absolutely be preferred for sharing state among threads.

Getting back to our server, the other useful behavior of Queues is that if get() is called on an empty Queue, then it will block until something is added to the Queue. We take advantage of this in our send threads. Notice, how we go into an infinite loop, with the first operation being a get() method call on a Queue. The thread will block there and patiently wait until something is added to its Queue. And, you've probably guessed it, our receive threads add the messages to the queues.

We create a Queue object for each send thread as it's being created and then we store the queues in the send_queues dict. For our receive threads to broadcast new messages, they just need to add the message to each Queue in send_queues, which we do in the broadcast_msgs() function. Our waiting send threads will then unblock, pick the message out of their Queue and then send it to their client.

We've also added a handle_disconnect() function, which gets called whenever a client disconnects or a socket error occurs. This function ensures that queues associated with closed connections are cleaned up, and that the socket is closed properly from the server end.

Locks

Contrast our use of the Queues object with our use of send_queues. Dict objects are not thread safe, and unfortunately there isn't a thread safe associative array type in Python. Since we need to share this dict, we need to take extra precautions whenever we access it, and this is where the Lock comes in. Lock objects are a type of synchronization primitive. These are special objects built with functionality to help manage our threads and ensure that they don't trip over each others' accesses.

A Lock is either locked or unlocked. A thread can lock a thread by either calling acquire() on it, or as in our program, using it as a context manager. If a thread has acquired a lock and another thread also tries to acquire the lock, then the second thread will block on the acquire() call until the first thread releases the lock or exits the context. There is no limit on the number of threads that can try to acquire a lock at once – all but the first will block. By wrapping all the accesses to a non-thread safe object with a lock, we can ensure that no two threads operate on the object at the same time.

So, every time we add or remove something from send_queues, we wrap it in a Lock context. Notice that we're also protecting send_queues when we iterate over it. Even though we're not changing it, we want to be sure that it doesn't get modified while we're working with it.

Although we're being careful and using locks and thread safe primitives, we're not protected against all possible thread related pitfalls. Since the thread synchronization mechanisms themselves block, it's still quite possible to create deadlocks, where two threads are simultaneously blocking on objects locked by the other thread. The best approach to managing thread communication is to keep all the accesses to your shared state restricted to as small an area of your code as you can. In the case of this server, this module could be reworked as a class providing a minimum number of public methods. It could also be documented such that it discourages the changing of any internal state. This will keep this chunk of threading strictly confined to this class.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.162.214