A low-level event-driven chat server

So the event-driven architecture has a few great benefits, the catch is that for a low-level implementation, we need to write our code in a completely different style. Let's write an event-driven chat server to illustrate this.

Note that this example will not at all work on Windows as Windows lacks the poll interface which we will be employing here. There is an older interface, called select, which Windows does support, however it is slower and more complicated to work with. The event-driven frameworks that we look at later do automatically switch to select for us though, if we're running on Windows.

There is a higher performance alternative to poll called epoll, available on Linux operating systems, however it also more complicated to use, so for simplicity we'll stick with poll here. Again, the frameworks we discuss later automatically take advantage of epoll if it is available.

Finally, counter-intuitively, Python's poll interface lives in a module called select, hence we will import select in our program.

Create a file called 3.1-chat_server-poll.py and save the following code in it:

import select
import tincanchat
from types import SimpleNamespace
from collections import deque

HOST = tincanchat.HOST
PORT = tincanchat.PORT
clients = {}

def create_client(sock):
    """ Return an object representing a client """
    return SimpleNamespace(
                    sock=sock,
                    rest=bytes(),
                    send_queue=deque())

def broadcast_msg(msg):
    """ Add message to all connected clients' queues """
    data = tincanchat.prep_msg(msg)
    for client in clients.values():
        client.send_queue.append(data)
        poll.register(client.sock, select.POLLOUT)

if __name__ == '__main__':
    listen_sock = tincanchat.create_listen_socket(HOST, PORT)
    poll = select.poll()
    poll.register(listen_sock, select.POLLIN)
    addr = listen_sock.getsockname()
    print('Listening on {}'.format(addr))

    # This is the event loop. Loop indefinitely, processing events
    # on all sockets when they occur
    while True:
        # Iterate over all sockets with events
        for fd, event in poll.poll():
            # clear-up a closed socket
            if event & (select.POLLHUP | 
                        select.POLLERR |
                        select.POLLNVAL):
                poll.unregister(fd)
                del clients[fd]

            # Accept new connection, add client to clients dict
            elif fd == listen_sock.fileno():
                client_sock,addr = listen_sock.accept()
                client_sock.setblocking(False)
                fd = client_sock.fileno()
                clients[fd] = create_client(client_sock)
                poll.register(fd, select.POLLIN)
                print('Connection from {}'.format(addr))

            # Handle received data on socket
            elif event & select.POLLIN:
                client = clients[fd]
                addr = client.sock.getpeername()
                recvd = client.sock.recv(4096)
                if not recvd:
                    # the client state will get cleaned up in the
                    # next iteration of the event loop, as close()
                    # sets the socket to POLLNVAL
                    client.sock.close()
                    print('Client {} disconnected'.format(addr))
                    continue
                data = client.rest + recvd
                (msgs, client.rest) = 
                                tincanchat.parse_recvd_data(data)
                # If we have any messages, broadcast them to all
                # clients
                for msg in msgs:
                    msg = '{}: {}'.format(addr, msg)
                    print(msg)
                    broadcast_msg(msg)

            # Send message to ready client
            elif event & select.POLLOUT:
                client = clients[fd]
                data = client.send_queue.popleft()
                sent = client.sock.send(data)
                if sent < len(data):
                    client.sends.appendleft(data[sent:])
                if not client.send_queue:
                    poll.modify(client.sock, select.POLLIN)

The crux of this program is the poll object, which we create at the start of execution. This is an interface for the kernel's poll service, which lets us register sockets for the OS to watch and notify us when they are ready for us work with them.

We register a socket by calling the poll.register() method, passing the socket as an argument along with the type of activity that we want the kernel to watch out for. There are several conditions which we can monitor by specifying various select.POLL* constants. We're using POLLIN and POLLOUT in this program to watch out for when a socket is ready to receive and send data respectively. Accepting a new incoming connection on our listening socket will be counted as a read.

Once a socket is registered with poll, the OS will watch it and record when the socket is ready to carry out the activity that we requested. When we call poll.poll(), it returns a list of all the sockets that have become ready for us to work with. For each socket, it also returns an event flag, which indicates the state of the socket. We can use this event flag to tell whether we can read from (POLLIN event) or write to the socket (POLLOUT event), or whether an error has occurred (POLLHUP, POLLERR, POLLNVAL events).

To make use of this, we enter our event loop, repeatedly calling poll.poll(), iterating through the ready objects it returns and operating on them as per their event flags.

Because we're only running in a single thread, we don't need any of the synchronization mechanisms which we had to employ in the multithreaded server. We're just using a regular dict to keep track of our clients. If you've not come across it before, the SimpleNamespace object that we use in the create_client() function is just a new idiom for creating an empty object with a __dict__ (this is needed because Object instances don't have a __dict__ so they won't accept arbitrary attributes). Previously, we may have used the following to give us an object which we can assign arbitrary attributes to:

class Client:
  pass
client = Client()

Python version 3.3 and later versions give us the new, more explicit SimpleNamespace object.

We can run our multithreaded client against this server. The server is still using the same network protocol, and the architecture of the two programs won't affect the communication. Give it a try and verify if it works as expected.

This style of programming, employing poll and non-blocking sockets, is often referred to as non-blocking and asynchronous, since we use sockets in non-blocking mode, and the thread of control handles I/O reactively, as it needs to happen, rather than locking to a single I/O channel until it's done. However, you should note that our program isn't completely non-blocking, since it still blocks on the poll.poll() call. This is pretty much inevitable in an I/O bound system because when nothing's happening, you've got to wait for the I/O activity somewhere.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.157.142