Implementation of a TCP server

The AsyncIO module contains implementations of a TCP server and client. These are the implementations used in this project. These two components are implemented as drivers because they are side-effects. The TCP server operates on the following data types:

Sink = namedtuple('Sink', ['request'])
Source = namedtuple('Source', ['response'])

# Sink items
Listen = namedtuple('Listen', ['host', 'port'])
Write = namedtuple('Write', ['id', 'data'])

# Source items
Connection = namedtuple('Connection', ['id', 'observable'])
Data = namedtuple('Data', ['data'])

Sink and Source for the driver are composed of only one observable each: a request observable for Sink, and a response observable for Source.

Two Sink items are needed. The first one, named Listen, is the request to start the server. This item contains two fields: the host and port used to bind the server. The second sink item, named Write, is used for writing data on an established connection. Its contains two fields. The first one is the id of the connection where the data must be written, and the second one is the data filed to write on the network link.

Two Source items are emitted. The Connection item is emitted each time a new client connects to the server. This item contains two fields. The first one is an id identifying this connection. This id must be provided back in the Write items. The second field is observable, which emits items each time data is available on the TCP link. So the response observable is a higher-order observable emitting one item each time a new client connects to the server. The second source item, named Data, is the item emitted on the observable of the Connection item. The Data item contains a data field containing the bytes received on the network link.

The subscription callback for the response observable looks like this:

def on_subscribe(observer):
async def listen(host, port, handler):
try:
await asyncio.start_server(
handler, host, port, loop=loop)
except Exception as e:
loop.call_soon(observer.on_error(e))

async def write(writer, data):
writer.write(data)

def on_next(i):
if type(i) is Listen:
asyncio.ensure_future(listen(
i.host, i.port, client_connected))
elif type(i) is Write:
asyncio.ensure_future(write(i.id, i.data))

sink.request.subscribe(
on_next=on_next,
on_completed=observer.on_completed,
on_error=observer.on_error
)

Starting from the end, the request observable is subscribed with the error and completion handler forwarding the events to the response observer. The on_next handler accepts the two sink items defined previously.

The Listen item schedules the execution of the listen coroutine. This coroutine starts a TCP server bound on the host and port provided in the listen item. The handler of this server is the client_connected coroutine, which will be detailed later.

The Write item schedules the execution of the write coroutine. Wrapping the write call of writer is necessary because it is not a coroutine. However, this write method (write is an object of the StreamWriter class) is not blocking: it is buffering the data to send it asynchronously. So the data is not necessarily sent when the write method returns. The id field used in the Write item is a StreamWriter object.

Finally let's see the client_connected handler, called each time a client connects to the server. It is implemented as a nested function of on_subscribe, as follows:

async def client_connected(reader, writer):
def on_connection_subscribe(observer, reader, writer):
async def handle_connection(observer, reader, writer):
while True:
try:
data = await reader.read(100)
if data == b'':
break
loop.call_soon(observer.on_next, Data(data=data))
except Exception as e:
loop.call_soon(observer.on_error(e))
break

loop.call_soon(observer.on_completed)
writer.close()

asyncio.ensure_future(handle_connection(
observer, reader, writer))

connection = Observable.create(
lambda o: on_connection_subscribe(o, reader, writer))
observer.on_next(Connection(
id=writer,
observable=connection))

Let's start reading the code from the last block. First, an observable is created. Its subscription function is called with a reference to the context as the second argument. Then a Connection item is emitted on observer of the response observable.

The on_connection_subscribe function just schedules the execution of the handle_connection coroutine. This coroutine reads data on the connection infinitely, until the connection is closed by the client (in this case the read call returns no data). Each time some data is received, this data is emitted on the data observable of the connection. When the connection is closed, the data observable is completed and writer is closed.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.172.200