Using generators for asynchronous handlers

So how can generators help with writing asynchronous code? The twist comes from the fact that a generator can be interrupted where we want it to be and resumed later when something happens, such as when an asynchronous action is completed. This means that generators can be used to trigger asynchronous action executions with the yield expression, and they can be resumed when the action has completed.

Let's rewrite the state machine as a generator. The asynchronous operations are the read calls on the transport channel. So, each time the state machine needs to get some data, it has to yield so that more data can be read. Here is the new code of the state machine:

def parser(read_next):
while True:
sync = yield read_next
if sync != 42:
print("error!")
return

size = yield read_next

while size > 0:
data = yield read_next
print("payload: {}".format(data))
size -= 1

The main and very visible change is the fact that the whole code is much smaller. Instead of a class composed of four methods for each state, the implementation is now a single function with a code flow very similar to synchronous code. In order to understand what is going on, read the code as if the yield read_next expression was a call to a read_next function that would return the next integer in the transport channel. The function is an infinite loop composed of three parts:

  1. The first part reads the sync word and returns an error if its value is not 42
  2. The second part reads the size of the payload
  3. The last part reads all the words of the payload and prints them

Once this is done, the loop takes another step and reads the sync word of the next packet. Reading and writing this code is much more natural than reading/writing code based on a state machine for several reasons:

  • The code flow follows the application logic. This helps you avoid mental headaches when trying to remember where the implementation of the previous nth step was.
  • All states are local variables. This also helps you to read the code with state variables being declared and initialized when needed instead of the constructor of the class that can be out of sight.

Now that the state machine is implemented, it must be fed data. Another generator is used to simulate the availability of data on the channel:

def socket():
yield 42
yield 3
yield 33
yield 44
yield 24
yield 43
yield 4

This generator is much simpler and yields the same values as in the previous implementation. Here, the implementation is now a function instead of a class. The missing piece is a component that will orchestrate these two generators and feed the parser generator data returned from the socket generator. A first implementation attempt could look like this:

s = socket()
p = parser(s)

next(p)
word = next(s)
p.send(word)
word = next(s)
p.send(word)
word = next(s)
p.send(word)
word = next(s)
p.send(word)
word = next(s)
p.send(word)
word = next(s)
p.send(word)

First, the socket and parser generators are instantiated. Then the parser is initialized with a first call to the next function. After that, the first word is read on socket by calling the next function on the socket generator. Its result is stored in the word variable. This data is forwarded to the parser generator by calling its send method. Finally, this next/send sequence is repeated several times to alternatively read some data and feed the parser this data. If you run this whole code, you will get the following output:

payload: 33
payload: 44
payload: 24
error!

The good news is that it works! The bad news is that the code managing the generator is completely redundant; it does not use the yielded values of the parser and does not work if we change one of the generators. This part should work whenever the data being returned by the socket generator changes, and also if the definition of the state machine changes. So, let's reimplement it as a loop:

s = socket()
p = parser(s)

try:
c = next(p)
while True:
data = next(c)
c = p.send(data)

except StopIteration:
print("Done")

When executed, it provides the following output:

payload: 33
payload: 44
payload: 24
error!
Done

This is much better. First, the completion of the generators is correctly handled by catching the StopIteration exception. Then, by replacing the next/send sequence with an infinite loop, the program still works if one of the generators changes.

By implementing these two generators and the loop managing them, we have basically implemented an asynchronous framework with an event loop. The only thing that is missing is a system call to select or poll after sending, so that the event loop will actually wait for data to be available on a real socket before iterating.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.205.136