Making the server non-blocking

Finally, we will consider the problem of implementing a non-blocking server again. Here, we are applying the asynchronous generators discussed previously to facilitate the asynchronous reading and handling of data received from clients of the server. The actual code for the server is included in the Chapter18/example6.py file; we will be going through various parts of it, as it is a relatively long program. Let's turn our attention to the global variables that we will have in this program, as follows:

# Chapter18/example6.py

from collections import namedtuple

###########################################################################
# Reactor

Session = namedtuple('Session', ['address', 'file'])

sessions = {} # { csocket : Session(address, file)}
callback = {} # { csocket : callback(client, line) }
generators = {} # { csocket : inline callback generator }

To be able to successfully facilitate services for multiple clients at the same time, we will allow the server to have multiple sessions (one for each client) at the same time, and therefore, we will need to keep track of multiple dictionaries, each of which will hold one specific piece of information about the current session.

Specifically, the sessions dictionary maps a client socket connection to a Session object, which is a Python namedtuple object that contains the address of the client and the file object associated with that client connection. The callback dictionary maps a client socket connection to a callback that is the return value of the asynchronous generator that we will implement later; each of these callbacks takes in its corresponding client socket connection and data read from that client as arguments. Finally, the generators dictionary maps a client socket connection to its corresponding asynchronous generator.

Now, let's take a look at the reactor function:

# Chapter18/example6.py

import socket, select

# Main event loop
def reactor(host, port):
sock = socket.socket()
sock.bind((host, port))
sock.listen(5)
sock.setblocking(0) # Make asynchronous

sessions[sock] = None
print(f'Server up, running, and waiting for call on {host} {port}')

try:
while True:
# Serve existing clients only if they already have data ready
ready_to_read, _, _ = select.select(sessions, [], [], 0.1)
for conn in ready_to_read:
if conn is sock:
conn, cli_address = sock.accept()
connect(conn, cli_address)
continue

line = sessions[conn].file.readline()
if line:
callback[conn](conn, line.rstrip())
else:
disconnect(conn)
finally:
sock.close()

Aside from what we already had from our previous blocking server, we are adding in a number of instructions: we use the setblocking() method from the socket module to potentially make our server asynchronous, or non-blocking; as we are starting a server, we also register that specific socket to the sessions dictionary, with a None value for now.

Inside our infinite while loop (the event loop) is part of the new non-blocking feature that we are trying to implement. First, we use the select() method from the select module to single out the sockets from the sessions dictionary that are ready to be read (in other words, the sockets that have available data). Since the first argument of the method is for the data to be read, the second is for the data to be written, and the third is for exception data, we are only passing in the sessions dictionary in the first argument. The fourth argument specifies the timeout period for the method (in seconds); if unspecified, the method will block infinitely, until at least one item in sessions becomes available, which is not suitable for our non-blocking server.

Next, for every client socket connection that is ready to be read, if the connection corresponds to our original server socket, we will accept that connection and call the connect() function (which we will look at soon). In this for loop, we will also handle the callback methodologies. Specifically, we will access the file attribute of the session of the current socket connection (recall that each session has an address attribute and a file attribute) and will read data from it using the readline() method. Now, if what we read is valid data, then we will pass it (along with the current client connection) to the corresponding callback; otherwise, we will end the connection.

Note that even though our server is made asynchronous by the socket being set to non-blocking, the preceding readline() method is still a blocking function. The readline() function returns when it gets to a carriage return in its input data (the ' ' character in ASCII). This means that if the data sent by a client somehow does not contain a carriage return, then the readline() function will fail to return. However, since the server is still non-blocking, an error exception will be raised so that other clients will not be blocked.

Now, let's look at our new helper functions:

# Chapter18/example6.py

def connect(conn, cli_address):
sessions[conn] = Session(cli_address, conn.makefile())

gen = process_request(conn)
generators[conn] = gen
callback[conn] = gen.send(None) # Start the generator

def disconnect(conn):
gen = generators.pop(conn)
gen.close()
sessions[conn].file.close()
conn.close()

del sessions[conn]
del callback[conn]

The connect() function, which is to be called when a client connection has data that is ready to read, will initiate starting instructions at the beginning of a valid connection with a client. First, it initializes the namedtuple object associated with that specific client connection (we are still using the makefile() method to create the file objects here). The rest of the function is what we saw in the usage pattern of asynchronous generators, which we discussed earlier: we pass the client connection to process_request(), which is now an asynchronous generator; register it in the generators dictionary; have it call send(None) to initiate the generator; and store the return value to the callback dictionary, so that it can be called later (specifically, in the last part of the event loop in the reactor that we just saw).

The disconnect() function, on the other hand, facilitates various cleaning instructions when a connection with a client stops. It removes the generator associated with the client connection from the generators dictionary and closes the generator, the file object stored in the sessions dictionary, as well as the client connection itself. Finally, it deletes the keys that correspond to the client connection from the remaining dictionaries.

Let's turn our attention to the new process_request() function, which is now an asynchronous generator:

# Chapter18/example6.py

from operator import mul
from functools import reduce

###########################################################################
# User's Business Logic

async def process_request(conn):
print(f'Received connection from {sessions[conn].address}')
mode = 'sum'

try:
conn.sendall(b'<welcome: starting in sum mode> ')
while True:
line = await readline(conn)
if line == 'quit':
conn.sendall(b'connection closed ')
return
if line == 'sum':
conn.sendall(b'<switching to sum mode> ')
mode = 'sum'
continue
if line == 'product':
conn.sendall(b'<switching to product mode> ')
mode = 'product'
continue

print(f'{sessions[conn].address} --> {line}')
try:
nums = list(map(int, line.split(',')))
except ValueError:
conn.sendall(
b'ERROR. Enter only integers separated by commas ')
continue

if mode == 'sum':
conn.sendall(b'Sum of input integers: %a '
% str(sum(nums)))
else:
conn.sendall(b'Product of input integers: %a '
% str(reduce(mul, nums, 1)))
finally:
print(f'{sessions[conn].address} quit')

The logic that handles client data and performs the computation remains the same, and the only differences with this new function are the async keyword (placed in front of the def keyword) and the await keyword used with the new readline() function. These differences, in essence, convert our process_request() function into a non-blocking one, with the condition that the new readline() function is also non-blocking:

# Chapter18/example6.py

import types

@types.coroutine
def readline(conn):
def inner(conn, line):
gen = generators[conn]
try:
callback[conn] = gen.send(line) # Continue the generator
except StopIteration:
disconnect(conn)

line = yield inner
return line

Similar to what we saw in the previous example, we are importing the types module from Python and using the @types.coroutine decorator to make the readline() function a generator-based coroutine, which is non-blocking. Each time a callback (which takes in a client connection and a line of data) is called, the execution flow will go into the inner() function inside this coroutine and execute the instructions.

Specifically, it sends the line of data to the generator, which will enable the instructions in process_request() to handle it asynchronously and store the return value to the appropriate callback—unless the end of the generator has been reached, in which case the disconnect() function will be called.

Our last task is to test whether this server is actually capable of handling multiple clients at the same time. To do this, execute the following script first:

> python3 example6.py
Server up, running, and waiting for call on localhost 8080

Similar to what you saw earlier, open two additional Terminals and use Telnet into this running server with both:

> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>

As you can see, both clients are being handled correctly: both are able to connect, and both receive the welcome message. This is also illustrated by the server output, as follows:

> python3 example6.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 63855)
Received connection from ('127.0.0.1', 63856)

Further tests could involve sending messages to the server at the same time, which it can still handle. The server can also keep track of individual modes of calculation that are unique to individual clients (in other words, assuming each client has a separate mode of calculation). We have successfully built a non-blocking, concurrent server from scratch.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.136.18.218