An implementation of line-based framing

Let's see how line-based framing can be implemented via two lettable functions (functions that can be used with the let operator). The following is the code for the framing operator:

def frame(datagram):
def on_subscribe(observer):
def on_next(i):
if ' ' in i:
observer.on_error(ValueError('
newline must be escaped'))
observer.on_next(i + ' ')

datagram.subscribe(
on_next=on_next,
on_error=observer.on_error,
on_completed=observer.on_completed,
)
return Observable.create(on_subscribe)

It takes an observable of datagrams as input, and returns an observable of data that can be sent on the wire. The frame function follows the now classical pattern on input/output observable functions. The datagram observable is subscribed when observer subscribes to the output observable. Each time an item is received, then a newline character is added at the end. Note that this could have been done directly with the map operator, as in the following example:

def frame(datagram):
return datagram.map(lambda i: i + ' ')

However, an additional check is done here to ensure that the incoming input does not contain a newline. If this is the case, then an error is notified.

The unframe operator is more complex, because, for each received item, it can emit zero, one, or several items. The following is its code:

def unframe(data):
def on_subscribe(observer):
acc = ''

def on_next(i):
nonlocal acc
lines = i.split(' ')
lines[0] = acc + lines[0]
acc = lines[-1]
datagrams = lines[0:-1]
for datagram in datagrams:
observer.on_next(datagram)

data.subscribe(
on_next=on_next,
on_error=observer.on_error,
on_completed=observer.on_completed,
)
return Observable.create(on_subscribe)

The pattern is still the same. The acc variable is an accumulator initialized to an empty string. This accumulator stores any remaining characters from the previous items, so that complete lines can be recomposed from multiple items. The on_next function splits the received string, based on the newline character. Depending on the way the data is received on the network, there can be zero, one, or more lines in the item. After splitting the item into multiple lines, the first line is prefixed with any remaining data from the previous item. Then the accumulator is updated with this as the content of the last partial line. Note that, if the received item ends with a newline character, then the last entry of the list returned by the split function is an empty string. Finally, all full lines are sent on the output observable.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.254.44