Implementation of a TCP client

The TCP client is also implemented as a Cyclotron driver. Its behavior is similar to that of the server: it takes the similar Sink observables as input and returns similar Source observables. The following example shows the types used by this driver:

Sink = namedtuple('Sink', ['request'])
Source = namedtuple('Source', ['response'])

# Sink items
Connect = namedtuple('Connect', ['host', 'port'])
Write = namedtuple('Write', ['id', 'data'])

# Source items
Connection = namedtuple('Connection', ['id', 'observable'])
Data = namedtuple('Data', ['data'])

The only difference is the Listen sink item that is replaced with a Connect item, taking the same arguments. The meaning however is different: the Listen item on the TCP server is a request to bind a server to the provided host and port, while the Connect item of the client is a request to connect to the provided host and port.

The subscription to the Source observable is the following code, also very similar to the server driver:

def on_subscribe(observer):
async def tcp_client(host, port):
try:
reader, writer = await asyncio.open_connection(
host, port, loop=loop)
connection = Observable.create(
lambda o: on_connection_subscribe(o, reader, writer))
observer.on_next(Connection(
id=writer,
observable=connection))
except Exception as e:
loop.call_soon(observer.on_error(e))

async def write(writer, data):
writer.write(data)

def on_next(i):
if type(i) is Connect:
asyncio.ensure_future(tcp_client(i.host, i.port))
elif type(i) is Write:
asyncio.ensure_future(write(i.id, i.data))

sink.request.subscribe(
on_next=on_next,
on_completed=observer.on_completed,
on_error=observer.on_error
)

The handling of the Write item is exactly the same: a coroutine is scheduled to write data on the writer of the connection.

The Listen item schedules the execution of the tcp_client coroutine. The implementation of this coroutine is also similar to the client_connection coroutine of the server. It first tries to connect to the host and port provided in the Connect item. In the event of an error, the response observable is completed (in this case, the application will terminate but, for a client application, this behavior is usual). In the event of success, a Connection item is emitted on the response observable, with a new data observable in the observable field. The subscription function of this latter observable is a function that reads the data on the network link, as can be seen in the following example:

def on_connection_subscribe(observer, context):
async def handle_connection(observer, context):
while True:
try:
data = await context.reader.read(100)
if data == b'':
break
loop.call_soon(observer.on_next, Data(data=data))
except Exception as e:
loop.call_soon(observer.on_error(e))
break

loop.call_soon(observer.on_completed)
context.writer.close()

asyncio.ensure_future(handle_connection(observer, context))

This function schedules a coroutine that reads data on the link until the connection is interrupted. Each time some data is available, then a Data item is emitted on the data observable of the connection. Once again, this is very similar to the behavior of the TCP server driver.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.166.255