Implementation of the publisher

The publisher is implemented as a Cyclotron component, being the entry point of the application. This component implements the three layers of remote stream multiplexing in fewer than 30 lines of code, thanks to the ReactiveX operators. This component uses only one driver, the TCP server implemented in the previous example. Here is the function implementing this component:

def rmux_server(sources):
tcp_listen = Observable.just(tcp_server.Listen(
host='127.0.0.1', port='8080'
))

beat = (
sources.tcp_server.response
.flat_map(lambda connection: connection.observable
.map(lambda i: i.data.decode('utf-8'))
.let(unframe)
.map(lambda i: json.loads(i))
.flat_map(lambda subscription: create_observable[
subscription['name']]()
.materialize()
.map(lambda i: materialize_repr(i, subscription['id']))
)
.map(lambda i: json.dumps(i))
.let(frame)
.map(lambda j: tcp_server.Write(
id=connection.id, data=j.encode()))
)
)

tcp_sink = Observable.merge(tcp_listen, beat)
return Sink(
tcp_server=tcp_server.Sink(request=tcp_sink),
)

Let's start with the first and last blocks of the function. First, the tcp_listen observable allows us to start the TCP server on a localhost and port 8080. At the end of the function, this observable is merged with the publish observable, and the result is provided as the request field of the TCP server Sink object.

The publish observable is the implementation of the three steps (in both ways) of the multiplexing layers. It is composed of two nested flat_map operators. Each level of flat_map allows us to save some context information.

The first flat_map layer maps each connection item to its data observable. All the operations on a connection are done inside this operator. This allows direct access to the connection item, which is necessary to send back data on the correct connection.

Then the incoming data is decoded as a UTF-8 string. This step is necessary because the data received from the TCP server contains bytes. This data must be decoded to a string before it can be parsed as a JSON string. The next operator unframes the incoming data, by using the unframe function implemented in the previous part. After this map operator, a complete JSON line is emitted. So it can be deserialized with the Python JSON module.

Then comes the routing layer. The routing in this case is implemented with the flat_map operator. Each time a subscription item is received, then an observable is returned, depending on the name field provided in the subscription object. The create_observable variable is a dictionary containing an Observable factory for each supported name value, as can be seen in the following example:

def one_two_three_four():
return Observable.from_(['1', '2', '3', '4'])

def lets_go():
return Observable.just("let's go")

create_observable = {
'1234': one_two_three_four,
'heyho': lets_go,
}

After this step, the observable must be serialized so that its items are sent on the TCP connection. Three types of event have to be sent: next, error, and completed, corresponding to the life cycle of an observable. The materialize operator just does this. So it is used to translate the observable to items representing the observable. An additional step is required to translate the object created by the materialize operator to something that can be serialized to a JSON string. This is done in the materialize_repr function. This function just maps RxPY notification objects to dictionaries, as can be seen in the following example:

def materialize_repr(notification, id):
if type(notification) is OnNext:
return {
'what': 'on_next',
'item': notification.value,
'id': id,
}
elif type(notification) is OnError:
return {
'what': 'on_error',
'error': str(notification.exception),
'id': id,
}
elif type(notification) is OnCompleted:
return {
'what': 'on_completed',
'id': id,
}

Once this step is done, there is no more need to use the subscription item, so the next step is done outside the second flat_map level. In the next step, serialization is done, always with the Python JSON module. After that, framing is done, again with the function implemented in the previous part. Finally, the resulting item is mapped to a write request, using the connection id available via the first flat_map level.

To test this publisher implementation, install and start the server from a Terminal, as follows:

(venv-rx) $ python3 setup.py install
(venv-rx) $ rmux_server

Then, from another Terminal, connect to it via telnet, and send a subscribe JSON request, as follows:

$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
{"what":"subscribe", "id":"42", "name":"1234"}

The server should answer with five messages, representing the received observable, as per the following example:

{"what": "on_next", "item": "1", "id": "42"}
{"what": "on_next", "item": "2", "id": "42"}
{"what": "on_next", "item": "3", "id": "42"}
{"what": "on_next", "item": "4", "id": "42"}
{"what": "on_completed", "id": "42"}

You can also try using the "heyho" name to check that it also works, and verify, with several telnet connections at the same time, whether answers are sent to the client that requested the observable and not the others.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.199.184