Implementation of the subscriber

The subscriber is implemented as a component. It uses two drivers: the tcp_client and stdout. Its implementation is as follows:

def rmux_client(sources):
response = sources.tcp_client.response.share()
tcp_connect = Observable.just(tcp_client.Connect(
host='127.0.0.1', port='8080'
))

create_observable = (
response
.flat_map(lambda connection:
Observable.just({
'what': 'subscribe',
'id':42,
'name': '1234'})
.map(lambda i: json.dumps(i))
.let(frame)
.map(lambda j: tcp_client.Write(
id=connection.id, data=j.encode()))
)
)

console = (
response
.flat_map(lambda connection: connection.observable
.map(lambda i: i.data.decode('utf-8'))
.let(unframe)
.map(lambda i: json.loads(i))
.group_by(lambda i: i['id'])
.flat_map(lambda subscription: subscription
.map(notification)
.dematerialize()
)
)
.map(lambda i: "item: {} ".format(i))
)

tcp_sink = Observable.merge(tcp_connect, create_observable)
return Sink(
tcp_client=tcp_client.Sink(request=tcp_sink),
stdout=stdout.Sink(data=console),
)

Let's start reading it from the first and last blocks. The first step transforms the response observable to a hot observable, so that it can be subscribed for two purposes, without triggering new subscriptions on the driver. Then a Connect item is emitted in a dedicated observable.

On the last block, two observables are merged to form the TCP client request observable: tcp_connect contains the connection request, and create_observable contains the subscription requests. Finally, Sink is returned with observables for both drivers.

The create_observable observable emits items containing subscription requests for remote observables. It is driven by incoming Connection items on the response observable. In the previous code example, only one item is emitted on the response observable, leading to the emission of one subscription request. First, the flat_map operator is used to save the connection context. Then a subscription dictionary is created. The last three steps correspond to the multiplexing stack, and are used as on the producer: serialization on a JSON string, framing, and sending a Write item to the TCP driver.

The console observable is the handler of incoming messages. In this subscriber implementation, received items are just printed on the console. The first three operators are now familiar: saving the connection context with flat_map, decoding bytes to a UTF-8 string, unframing, and deserialization from JSON to a Python dictionary. Then the group_by operator is used, with the observable id as a key: this allows us to group all messages of the same observables in dedicated observables. In other words, messages are demultiplexed from a single observable to several observables. Using this operator in combination with the following flat_map allows us to operate on items of a single observable instead of dealing with items of several observables. So, inside the flat_map observable, all received items are for a single observable. These items can be converted into RxPY notification objects (OnNext, OnCompleted, and OnError) as can be seen in the following example:

def notification(obj):
if obj['what'] == 'on_next':
return OnNext(obj['item'])
elif obj['what'] == 'on_completed':
return OnCompleted()
elif obj['what'] == 'on_error':
return OnError(obj['error'])

From that point, the dematerialize operator is used to recreate an observable from these messages.

After all these steps, the end of the first flat_map operator contains the items of an observable which has been carried over the network link. In this implementation, received items are simply printed, one per line.

This client can now be used instead of telnet, as shown in the following example:

(venv-rx) $ rmux_client
item: 1
item: 2
item: 3
item: 4

The whole code of the publisher and the subscriber is available in the GitHub repository (https://github.com/PacktPublishing/Hands-On-Reactive-Programming-with-Python) for this book, in the sub-folder rmux.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.166.31