In this chapter we look at case studies using the new Python features for async programming. We’ll be making use of several third-party libraries, and this is important to show because you will mostly be using libraries in your own projects.
The title of this section, 20 Asyncio Libraries… is a play on the
title of a previous book I wrote, called
20
Python Libraries You Aren’t Using (But Should). Many of those
libraries will also be useful in your asyncio
-based applications,
but in this chapter we’re going to be using libraries that have been
designed specifically for the new async features in Python.
It is difficult to present asyncio
-based code in short snippets. As
you will have seen in all the previous code samples in the book, I’ve
tried to make each example a complete, runnable program, because
application lifetime management is a core consideration required for
using async programming correctly.
For this reason, most of the case studies in this chapter will be somewhat larger, in terms of lines of code, than is usual for such a book. My goal in using this approach was to make the case studies more useful by giving you a “whole view” of an async program rather than leaving you having to figure out how detached fragments might fit together.
Sometimes the code samples in this chapter will compromise on code style in order to save space. I like PEP8 as much as the next Pythonista, but practicality beats purity!
Before looking at third-party libraries, let’s begin with the standard library. The Streams API is the high-level interface offered for async socket programming, and as the following case study will show, it’s pretty easy to use; however, application design remains complex due simply to the nature of the domain.
The following case study shows an implementation of a message broker, and first shows a naive design, followed by a more considered design. Neither should be considered production-ready, but my goal is to help you think about the various aspects of concurrent network programming that need to be taken into account when designing such applications.
A message queuing service is a message-oriented middleware or MOM deployed in a compute cloud using software as a service model. Service subscribers access queues and or topics to exchange data using point-to-point or publish and subscribe patterns.1
Wikipedia: “Message queuing service”
Recently I worked on a project that involved using ActiveMQ as a message broker for microservices intercommunication. At a basic level, such a broker (server):
maintains persistent socket connections to multiple clients.
receives messages from clients with a target “channel name.”
delivers those messages to all other clients subscribed to that same channel name.
I recall wondering how hard it might be to create such an application. As an added touch, ActiveMQ can perform different models of message distribution, and the two models are generally differentiated by the channel name:
Channel names with the prefix /topic
, e.g., /topic/customer/registration
are managed with the
publish-subscribe
pattern (all channel subscribers get all messages)
Channel names with the prefix /queue
are handled with the
point-to-point
model in which messages on a channel are distributed between channel
subscribers in a round-robin fashion: each subscriber gets a unique
message.
In our case study, we build a toy message broker with these basic features. The first issue we must address is that TCP is not a message-based protocol: we just get streams of bytes on the wire. We need to create our own protocol for the structure of messages, and the most simple protocol is to prefix each message with a size header, followed by a message payload of that size. The following utility library provides read and write for such messages:
# msgproto.py
import
asyncio
from
asyncio
import
StreamReader
,
StreamWriter
async
def
read_msg
(
reader
:
StreamReader
)
-
>
bytes
:
# Raises asyncio.streams.IncompleteReadError
size_bytes
=
await
reader
.
readexactly
(
4
)
size
=
int
.
from_bytes
(
size_bytes
,
byteorder
=
'
big
'
)
data
=
await
reader
.
readexactly
(
size
)
return
data
async
def
send_msg
(
writer
:
StreamWriter
,
data
:
bytes
)
:
writer
.
write
(
len
(
data
)
.
to_bytes
(
4
,
byteorder
=
'
big
'
)
)
writer
.
write
(
data
)
await
writer
.
drain
(
)
def
run_server
(
client
,
host
=
'
127.0.0.1
'
,
port
=
25000
)
:
loop
=
asyncio
.
get_event_loop
(
)
coro
=
asyncio
.
start_server
(
client
,
'
127.0.0.1
'
,
25000
)
server
=
loop
.
run_until_complete
(
coro
)
try
:
loop
.
run_forever
(
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
server
.
close
(
)
loop
.
run_until_complete
(
server
.
wait_closed
(
)
)
tasks
=
asyncio
.
Task
.
all_tasks
(
)
for
t
in
tasks
:
t
.
cancel
(
)
group
=
asyncio
.
gather
(
*
tasks
,
return_exceptions
=
True
loop
.
run_until_complete
(
group
)
loop
.
close
(
)
Get the first 4 bytes. This is the size prefix.
Those four bytes must be converted into an integer.
Now we know the payload size, read that off the stream.
Write is the inverse of read: first send the length of the data, encoded as 4 bytes.
Then send the data.
drain()
ensures that the data is fully sent. Without drain()
, the
data may still be waiting in the send buffer when this coroutine function
exits.
It doesn’t belong here, but I also snuck in a boilerplate function to
run a TCP server. The shutdown
sequence has been discussed before in a previous section, and I’m including
it here only to save space in the code samples that follow. Server shutdown will
begin on SIGINT
or Ctrl-C.
Now that we have a rudimentary message protocol, we can focus on the message broker application:
# mq_server.py
import
asyncio
from
asyncio
import
StreamReader
,
StreamWriter
,
gather
from
collections
import
deque
,
defaultdict
from
typing
import
Deque
,
DefaultDict
from
msgproto
import
read_msg
,
send_msg
,
run_server
SUBSCRIBERS
:
DefaultDict
[
bytes
,
Deque
]
=
defaultdict
(
deque
)
async
def
client
(
reader
:
StreamReader
,
writer
:
StreamWriter
)
:
peername
=
writer
.
transport
.
get_extra_info
(
'
peername
'
)
subscribe_chan
=
await
read_msg
(
reader
)
SUBSCRIBERS
[
subscribe_chan
]
.
append
(
writer
)
(
f
'
Remote
{peername}
subscribed to
{subscribe_chan}
'
)
try
:
while
True
:
channel_name
=
await
read_msg
(
reader
)
data
=
await
read_msg
(
reader
)
(
f
'
Sending to
{channel_name}
:
{data[:19]}
...
'
)
writers
=
SUBSCRIBERS
[
channel_name
]
if
writers
and
channel_name
.
startswith
(
b
'
/queue
'
)
:
writers
.
rotate
(
)
writers
=
[
writers
[
0
]
]
await
gather
(
*
[
send_msg
(
w
,
data
)
for
w
in
writers
]
)
except
asyncio
.
CancelledError
:
(
f
'
Remote
{peername}
closing connection.
'
)
writer
.
close
(
)
except
asyncio
.
streams
.
IncompleteReadError
:
(
f
'
Remote
{peername}
disconnected
'
)
finally
:
(
f
'
Remote
{peername}
closed
'
)
SUBSCRIBERS
[
subscribe_chan
]
.
remove
(
writer
)
if
__name__
==
'
__main__
'
:
run_server
(
client
)
Imports from our msgproto.py
module.
A global collection of currently active subscribers. Every time a client
connects, they must first send a channel name they’re subscribing to. A
deque
will hold all the subscribers for a particular channel.
The client()
coroutine function will produce a long-lived
coroutine for each new connection. Think of it as a callback for
the TCP server started in run_server()
. On this line, I’ve shown how
the host and port of the remote peer can be obtained, e.g., for logging.
Our protocol for clients is the following:
On first connect, a client must send a message containing the channel to
subscribe to (here, subscribe_chan
).
Thereafter, for the life of the connection, a client sends a message to a channel by first sending a message containing the destination channel name, followed by a message containing the data. Our broker will send such data-messages to every client subscribed to that channel name.
Add the StreamWriter
instance to the global collection of subscribers.
An infinite loop, waiting for data from this client. The first message from a client must be the destination channel name.
Next comes the actual data to distribute to the channel.
Get the deque of subscribers on the target channel.
Some special handling if the channel name begins with the magic word “/queue”: in this case, we send the data to only one of the subscribers, not all of them. This can be used for sharing work between a bunch of workers, rather than the usual pub-sub notification scheme where all subscribers on a channel get all the messages.
Here is why we use a deque and not a list: rotation of the deque
is
how we keep track of which client is next in line for “/queue” distribution.
This seems expensive until you realize that a single deque rotation is an
O(1) operation.
Target only whichever client is first; this changes after every rotation.
Create a list of coroutines for sending the message to each writer,
and then unpack these into gather()
so we can wait for all of the sending
to complete.
Note: This line is a bad flaw in our program, but it
may not be obvious why: though it may be true that all of the sending
to each subscriber will happen
concurrently, what happens if we have one very slow client? In this
case the gather()
will only finish when the slowest subscriber has
received their data. We can’t receive any more data from the sending
client until all these send_msg()
coroutines finish. This slows down all message
distribution to the speed of the slowest subscriber.
When leaving the client()
coroutine, make sure to remove
ourselves from the global SUBSCRIBERS
collection. Unfortunately, this is
an O(n) operation which can be a little expensive for very large n
. A
different data structure would fix this, but for now we console ourselves
with the understanding that connections are intended to be long-lived thus
few disconnection events; and n
is unlikely to be very large (say ~10,000 as
a rough order-of-magnitude estimate); and this code is at least very easy to
understand!
So that’s our server; now we need clients, and then we can show some output. For demonstration purposes we’ll make two kinds of clients: a “sender” and a “listener.” The server doesn’t differentiate: all clients are the same. The distinction between “sender” and “listener” behavior is only for educational purposes.
# mq_client_listen.py
import
asyncio
import
argparse
,
uuid
from
msgproto
import
read_msg
,
send_msg
async
def
main
(
args
)
:
me
=
uuid
.
uuid4
(
)
.
hex
[
:
8
]
(
f
'
Starting up
{me}
'
)
reader
,
writer
=
await
asyncio
.
open_connection
(
args
.
host
,
args
.
port
)
(
f
'
I am
{
writer.transport.get_extra_info(
"
sockname
"
)}
'
)
channel
=
args
.
listen
.
encode
(
)
await
send_msg
(
writer
,
channel
)
try
:
while
True
:
data
=
await
read_msg
(
reader
)
if
not
data
:
(
'
Connection ended.
'
)
break
(
f
'
Received by
{me}
:
{data[:20]}
'
)
except
asyncio
.
streams
.
IncompleteReadError
:
(
'
Server closed.
'
)
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--host
'
,
default
=
'
localhost
'
)
parser
.
add_argument
(
'
--port
'
,
default
=
25000
)
parser
.
add_argument
(
'
--listen
'
,
default
=
'
/topic/foo
'
)
loop
=
asyncio
.
get_event_loop
(
)
try
:
loop
.
run_until_complete
(
main
(
parser
.
parse_args
(
)
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
loop
.
close
(
)
The uuid
standard library module is a convenient way of creating an
“identity” for this listener. If you start up multiple instances of these,
each will have their own identity, and you’ll be able to track what is
happening in the logs.
Open a connection to the server.
The channel to subscribe to is an input parameter, captured in
args.listen
. Encode it into bytes
before sending.
By our protocol rules (as discussed in the broker code analysis previously), the first thing to do after connecting is to send the channel name to subscribe to.
This loop does nothing else but wait for data to appear on the socket.
The command-line arguments for this program make it easy to point to a host, a port, and a channel name to listen to.
The structure of the other client, the “sender” program, is similar to the listener module.
# mq_client_sender.py
import
asyncio
import
argparse
,
uuid
from
itertools
import
count
from
msgproto
import
send_msg
async
def
main
(
args
)
:
me
=
uuid
.
uuid4
(
)
.
hex
[
:
8
]
(
f
'
Starting up
{me}
'
)
reader
,
writer
=
await
asyncio
.
open_connection
(
host
=
args
.
host
,
port
=
args
.
port
)
(
f
'
I am
{
writer.transport.get_extra_info(
"
sockname
"
)}
'
)
channel
=
b
'
/null
'
await
send_msg
(
writer
,
channel
)
chan
=
args
.
channel
.
encode
(
)
for
i
in
count
(
)
:
await
asyncio
.
sleep
(
args
.
interval
)
data
=
b
'
X
'
*
args
.
size
or
f
'
Msg
{i}
from
{me}
'
.
encode
(
)
try
:
await
send_msg
(
writer
,
chan
)
await
send_msg
(
writer
,
data
)
except
ConnectionResetError
:
(
'
Connection ended.
'
)
break
writer
.
close
(
)
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--host
'
,
default
=
'
localhost
'
)
parser
.
add_argument
(
'
--port
'
,
default
=
25000
,
type
=
int
)
parser
.
add_argument
(
'
--channel
'
,
default
=
'
/topic/foo
'
)
parser
.
add_argument
(
'
--interval
'
,
default
=
1
,
type
=
float
)
parser
.
add_argument
(
'
--size
'
,
default
=
0
,
type
=
int
)
loop
=
asyncio
.
get_event_loop
(
)
try
:
loop
.
run_until_complete
(
main
(
parser
.
parse_args
(
)
)
)
except
KeyboardInterrupt
:
(
'
Bye!
'
)
loop
.
close
(
)
As with the listener, claim an identity.
As with the listener, reach out and make a connection.
According to our protocol rules, the first thing to do after connecting to the server is to give the name of the channel to subscribe to; however, since we are a sender, we don’t really care about subscribing to any channels; nevertheless, the protocol requires it so just provide a null channel to subscribe to (we won’t actually listen for anything).
Send the channel to subscribe to.
The command-line parameter args.channel
provides the channel to which
we want to send messages. Note that it must be converted to bytes first
before sending.
Using itertools.count()
is like a while True
loop, except that you get
an iteration variable to use. We use this in the debugging messages since
it makes it a bit easier to track which message got sent from where.
The delay between sent messages is an input parameter,
args.interval
. The next line generates the message payload. It’s
either a bytestring of specified size (args.size
), or it’s a
descriptive message. This flexibility is just for testing.
Send! Note that there are two messages here: the first is the destination channel name and the second is the payload.
As with the listener, there are a bunch of command-line options for tweaking the sender: “channel” determines the target channel to send to, while “interval” controls the delay between sends. The “size” parameter controls the size of each message payload.
We now have a broker, a listener, and a sender; it’s time to see some output. To produce the following code snippets, I have started up the server, then two listeners, and then a sender; and after a few messages have been sent, I’ve stopped the server with Ctrl-C:
$
python mq_server.py Remote(
'127.0.0.1'
, 55382)
subscribed to b'/queue/blah'
Remote(
'127.0.0.1'
, 55386)
subscribed to b'/queue/blah'
Remote(
'127.0.0.1'
, 55390)
subscribed to b'/null'
Sending to b'/queue/blah'
: b'Msg 0 from 6b5a8e1d'
... Sending to b'/queue/blah'
: b'Msg 1 from 6b5a8e1d'
... Sending to b'/queue/blah'
: b'Msg 2 from 6b5a8e1d'
... Sending to b'/queue/blah'
: b'Msg 3 from 6b5a8e1d'
... Sending to b'/queue/blah'
: b'Msg 4 from 6b5a8e1d'
... Sending to b'/queue/blah'
: b'Msg 5 from 6b5a8e1d'
... ^CBye! Remote(
'127.0.0.1'
, 55382)
closing connection. Remote(
'127.0.0.1'
, 55382)
closed Remote(
'127.0.0.1'
, 55390)
closing connection. Remote(
'127.0.0.1'
, 55390)
closed Remote(
'127.0.0.1'
, 55386)
closing connection. Remote(
'127.0.0.1'
, 55386)
closed
$
python mq_client_sender.py --channel /queue/blah Starting up 6b5a8e1d I am(
'127.0.0.1'
, 55390)
Connection ended.
$
python mq_client_listen.py --listen /queue/blah Starting up 9ae04690 I am(
'127.0.0.1'
, 55382)
Received by 9ae04690: b'Msg 1 from 6b5a8e1d'
Received by 9ae04690: b'Msg 3 from 6b5a8e1d'
Received by 9ae04690: b'Msg 5 from 6b5a8e1d'
Server closed.
$
python mq_client_listen.py --listen /queue/blah Starting up bd4e3baa I am(
'127.0.0.1'
, 55386)
Received by bd4e3baa: b'Msg 0 from 6b5a8e1d'
Received by bd4e3baa: b'Msg 2 from 6b5a8e1d'
Received by bd4e3baa: b'Msg 4 from 6b5a8e1d'
Server closed.
Our toy message broker works! The code is also pretty easy to understand, given such a complex problem domain, but unfortunately the design of the broker code itself is problematic.
The problem is that, for a particular client, we send messages to
subscribers in the same coroutine as where new messages are
received. This means that if any subscriber is slow to consume what
we’re sending, it might take a long time for that await gather(...)
line to
complete, and we cannot receive and process more messages while we
wait.
Instead, we need to decouple the receiving of messages from the sending of messages. In the next case study, we refactor our code to do exactly that.
In this case study we change the design of our toy message broker. The “listener” and “sender” programs remain as is. The main goal for the new broker design is to decouple sending and receiving. The code is slightly longer, but not terribly so.
# mq_server_plus.py
import
asyncio
from
asyncio
import
StreamReader
,
StreamWriter
,
Queue
from
collections
import
deque
,
defaultdict
from
contextlib
import
suppress
from
typing
import
Deque
,
DefaultDict
,
Dict
from
msgproto
import
read_msg
,
send_msg
,
run_server
SUBSCRIBERS
:
DefaultDict
[
bytes
,
Deque
]
=
defaultdict
(
deque
)
SEND_QUEUES
:
DefaultDict
[
StreamWriter
,
Queue
]
=
defaultdict
(
Queue
)
CHAN_QUEUES
:
Dict
[
bytes
,
Queue
]
=
{
}
async
def
client
(
reader
:
StreamReader
,
writer
:
StreamWriter
)
:
peername
=
writer
.
transport
.
get_extra_info
(
'
peername
'
)
subscribe_chan
=
await
read_msg
(
reader
)
SUBSCRIBERS
[
subscribe_chan
]
.
append
(
writer
)
loop
=
asyncio
.
get_event_loop
(
)
send_task
=
loop
.
create_task
(
send_client
(
writer
,
SEND_QUEUES
[
writer
]
)
)
(
f
'
Remote
{peername}
subscribed to
{subscribe_chan}
'
)
try
:
while
True
:
channel_name
=
await
read_msg
(
reader
)
data
=
await
read_msg
(
reader
)
if
channel_name
not
in
CHAN_QUEUES
:
CHAN_QUEUES
[
channel_name
]
=
Queue
(
maxsize
=
10
)
loop
.
create_task
(
chan_sender
(
channel_name
)
)
await
CHAN_QUEUES
[
channel_name
]
.
put
(
data
)
except
asyncio
.
CancelledError
:
(
f
'
Remote
{peername}
connection cancelled.
'
)
except
asyncio
.
streams
.
IncompleteReadError
:
(
f
'
Remote
{peername}
disconnected
'
)
finally
:
(
f
'
Remote
{peername}
closed
'
)
await
SEND_QUEUES
[
writer
]
.
put
(
None
)
await
send_task
del
SEND_QUEUES
[
writer
]
SUBSCRIBERS
[
subscribe_chan
]
.
remove
(
writer
)
async
def
send_client
(
writer
:
StreamWriter
,
queue
:
Queue
)
:
while
True
:
with
suppress
(
asyncio
.
CancelledError
)
:
data
=
await
queue
.
get
(
)
if
not
data
:
writer
.
close
(
)
break
await
send_msg
(
writer
,
data
)
async
def
chan_sender
(
name
:
bytes
)
:
with
suppress
(
asyncio
.
CancelledError
)
:
while
True
:
writers
=
SUBSCRIBERS
[
name
]
if
not
writers
:
await
asyncio
.
sleep
(
1
)
continue
if
name
.
startswith
(
b
'
/queue
'
)
:
writers
.
rotate
(
)
writers
=
[
writers
[
0
]
]
msg
=
await
CHAN_QUEUES
[
name
]
.
get
(
)
if
not
msg
:
break
for
writer
in
writers
:
if
not
SEND_QUEUES
[
writer
]
.
full
(
)
:
(
f
'
Sending to
{name}
:
{msg[:19]}
...
'
)
await
SEND_QUEUES
[
writer
]
.
put
(
msg
)
if
__name__
==
'
__main__
'
:
run_server
(
client
)
In the previous implementation, there were only
SUBSCRIBERS
; now there are SEND_QUEUES
and CHAN_QUEUES
as global
collections. This is a consequence of completely decoupling the receiving
and sending of data. SEND_QUEUES
has one queue entry for each
client connection: all data that must be sent to that client must be placed
onto that queue. (If you peek ahead, the send_client()
coroutine will pull
data off SEND_QUEUES
and send it.)
Up till this point in the client()
coroutine function, the code
is the same as the simple server: the subscribed channel name is received
and we add the StreamWriter
instance for the new client
to the global SUBSCRIBERS
collection.
This is new: we create a long-lived
task that will do all the sending of data to this client. The task
will run independently as a separate coroutine, and will pull messages off the
supplied queue, SEND_QUEUES[writer]
, for sending.
Now we’re inside the loop where we receive data. Remember
that we always receive two messages: one for the destination channel name, and one
for the data. We’re going to create a new, dedicated Queue
for every
destination channel, and that’s what CHAN_QUEUES
is for: when any client wants to
push data to a channel, we’re going
to put that data onto the appropriate queue and then go immediately back to
listening for more data. This approach decouples the distribution of
messages from the receiving of messages from this client.
If there isn’t already a queue for the target channel, make one.
Create a dedicated, long-lived task for that channel. The coroutine,
chan_sender()
, will be responsible for taking data off the channel queue
and distributing that data to subscribers.
Place the newly received data onto the specific channel’s queue. Note that if the queue fills up, we’ll wait here until there’s space for the new data. By waiting here, we won’t be reading any new data off the socket, which means that the client will have to wait on sending new data into the socket on their side. This isn’t necessarily a bad thing, since it communicates so-called back-pressure to this client. (Alternatively, you could choose to drop messages here if the use-case is OK with that.)
When the connection is closed, it’s time to clean up! The long-lived
task we created for sending data to this client, send_task
, can be
shut down by placing None
onto its queue, SEND_QUEUES[writer]
(check the
code for send_client()
). It’s important to use a value on the queue, rather
than outright cancellation, because there may already be data on that queue
and we want that data to be sent out before send_client()
is ended.
Wait for that sender task to finish.
Remove the entry in the SEND_QUEUES
collection (and in the next
line we also remove the sock
from the SUBSCRIBERS
collection as before).
The send_client()
coroutine function is very nearly a textbook example
of pulling work off a queue. Note how the coroutine will exit if None
is
placed onto the queue. Note also how we suppress CancelledError
inside
the loop: this is because we want this task to only be closed by receiving
a None
on the queue. This way, all pending data on the queue can be
sent out before shutdown.
chan_sender()
is the distribution logic for a channel: it sends data
from a dedicated channel Queue
instance to all the subscribers on that
channel. But what happens if there are no subscribers for this channel yet?
We’ll just wait a bit and try again. (Note that the queue for
this channel, i.e., CHAN_QUEUES[name]
will keep filling up though.)
As before in our previous broker implementation, we do something
special for channels
whose name begins with “/queue”: we rotate the deque
and send only to
the first entry. This acts like a crude load-balancing system because each
subscriber gets different messages off the same queue. For all other channels,
all subscribers get all the messages.
We’ll wait here for data on the queue. On the next line,
exit if None
is received. Currently this isn’t triggered anywhere
(so these chan_sender()
coroutines live forever); but if logic were added to clean up these channel
tasks after, say, some period of inactivity, that’s how it would be done.
Data has been received, so it’s time to send to subscribers. Note that we do not do the sending here: instead, we place the data onto each subscriber’s own send queue. This decoupling is necessary to make sure that a slow subscriber doesn’t slow down anyone else receiving data. And furthermore, if the subscriber is so slow that their send queue fills up, we don’t put that data on their queue, i.e., it is lost.
The above design produces the same output as the earlier, simplistic implementation, but now we can be sure that a slow listener will not interfere with message distribution to other listeners.
These two case studies show a progression in thinking around the design of a message distribution system. A key aspect was the realization that sending and receiving data might be best handled in separate coroutines, depending on the use-case. In such instances, queues can be very useful for moving data between those different coroutines, and for providing buffering to decouple them.
The more important goal of these case studies was to show how the Streams API
in asyncio
makes it very easy to build socket-based applications.
The Twisted project predates—dramatically—the
asyncio
standard library, and has been flying the
flag of async programming in Python for around 14 years now. The project
provides not only the basic building blocks, like an event loop, but also
primitives like deferreds that are a bit like the futures in asyncio
.
The design of asyncio
has been heavily influenced by Twisted and the
extensive experience of its leaders and maintainers.
Note that asyncio
does not replace Twisted.2
Twisted includes hiqh-quality implementations of a huge number of internet
protocols, including not only the usual HTTP but also XMPP, NNTP, IMAP, SSH,
IRC, and FTP (both servers and clients). And the list goes on: DNS? Check.
SMTP? Check. POP3? Check.
At the code level, the main difference between Twisted and
asyncio
, apart from history and historical context, is that for a
long time, Python lacked language support for coroutines, and this
meant that Twisted and projects like it had to figure out ways of
dealing with asynchronicity that worked with standard Python syntax.
For most of Twisted’s history, callbacks were the means by which
async programming was done, with all the nonlinear complexity that entails.
When it became possible to use generators as makeshift coroutines, it
suddenly became possible to lay out code in Twisted in a linear fashion
using its defer.inlineCallbacks
decorator:
@defer
.
inlineCallbacks
def
f
(
)
:
yield
defer
.
returnValue
(
123
)
@defer
.
inlineCallbacks
def
my_coro_func
(
)
:
value
=
yield
f
(
)
assert
value
==
123
Ordinarily, Twisted requires creating instances of Deferred
, and
adding callbacks to that instance as the method of constructing async
programs. A few years ago, the @inlineCallbacks
decorator was added which
repurposes generators as coroutines.
While @inlineCallbacks
did allow you to write code that was linear
in appearance (unlike callbacks), some hacks were required, such as this
call to defer.returnValue()
, which is how you have to return values from
@inlineCallbacks
coroutines.
Here we can see the yield
that makes this function a generator. For
@inlineCallbacks
to work, there must be at least one yield
present
in the function being decorated.
Since native coroutines appeared in Python 3.5, the Twisted team
(and Amber Brown in particular) have been
working to add support for running Twisted on the asyncio
event loop.
This is an ongoing effort, and my goal in this section is not to convince you to create all your applications as Twisted-asyncio hybrids, but rather to make you aware that work is currently being done to provide significant interoperability between Twisted and asyncio.
For those of you with significant experience with Twisted, the following code example might be jarring:
from
time
import
ctime
from
twisted
.
internet
import
asyncioreactor
asyncioreactor
.
install
(
)
from
twisted
.
internet
import
reactor
,
defer
,
task
async
def
main
(
)
:
for
i
in
range
(
5
)
:
(
f
'
{
ctime()} Hello
{i}
'
)
await
task
.
deferLater
(
reactor
,
1
,
lambda
:
None
)
defer
.
ensureDeferred
(
main
(
)
)
reactor
.
run
(
)
This is how you tell Twisted to use the asyncio
event loop as its
main reactor
. Note that this line must come before the reactor is imported
from twisted.internet
on the following line.
Anyone familiar with Twisted programming will recognize these imports.
We don’t have space to cover them here, but in a nutshell, the reactor
is
the Twisted
version of the asyncio
loop, and defer
and task
are
namespaces for tools to work with scheduling coroutines.
Seeing async def
here, in a Twisted program, looks terribly
out-of-place, but this is indeed what the new support for async/await
gives us: the ability to use native coroutines directly in Twisted programs!
In the older @inlineCallbacks
world, you would have used
yield from
here, but now we can use await
, the same as in asyncio
code.
The other part of this line, deferLater
, is an alternative way to do the
same thing as asyncio.sleep(1)
. We await
a future where, after 1 second,
a do-nothing callback will fire.
ensureDeferred()
is a Twisted version of scheduling a coroutine. This
would be analogous to loop.create_task()
or asyncio.ensure_future()
.
Running the reactor is the same as loop.run_forever()
in asyncio
.
Output:
$
python
twisted_asyncio
.
py
Mon
Oct
16
16
:
19
:
49
2017
Hello
0
Mon
Oct
16
16
:
19
:
50
2017
Hello
1
Mon
Oct
16
16
:
19
:
51
2017
Hello
2
Mon
Oct
16
16
:
19
:
52
2017
Hello
3
Mon
Oct
16
16
:
19
:
53
2017
Hello
4
There is much more to learn about Twisted, and in particular it is well
worth your time to go through the list of implemented networking protocols
in Twisted. There is still some work to be done, but the future looks
very bright for interoperation between Twisted and asyncio
.
The design of asyncio
has been set up so that we can look forward to
a future where it will be possible to incorporate code from many
different async frameworks, such as Twisted and Tornado, into a single application, with all code running on the same event loop.
The Janus Queue (installed with pip install janus
) provides a
solution for communication between threads and coroutines. In the
standard library, there are these kinds of queues:
queue.Queue
: a “blocking” queue, commonly used for communication
and buffering between threads
asyncio.Queue
: an async
-compatible queue, commonly used for
communication and buffering between coroutines.
Unfortunately, neither is useful for communication between threads and coroutines! This is where Janus comes in: it is a single Queue that exposes both APIs: a blocking one and an async one. In the following code sample, data is generated from inside a thread, placed on a queue, and then consumed from a coroutine.
import
asyncio
,
time
,
random
,
janus
loop
=
asyncio
.
get_event_loop
(
)
queue
=
janus
.
Queue
(
loop
=
loop
)
async
def
main
(
)
:
while
True
:
data
=
await
queue
.
async_q
.
get
(
)
if
data
is
None
:
break
(
f
'
Got
{data}
off queue
'
)
(
'
Done.
'
)
def
data_source
(
)
:
for
i
in
range
(
10
)
:
r
=
random
.
randint
(
0
,
4
)
time
.
sleep
(
r
)
queue
.
sync_q
.
put
(
r
)
queue
.
sync_q
.
put
(
None
)
loop
.
run_in_executor
(
None
,
data_source
)
loop
.
run_until_complete
(
main
(
)
)
loop
.
close
(
)
Create a Janus queue. Note that just like asyncio.Queue
, the
Janus queue will be associated with a specific event loop. As
usual, if you don’t provide the loop
parameter, the standard
get_event_loop()
call will be used internally.
Our main()
coroutine function simply waits for data on a
queue. This line will suspend until there is data, exactly like
asyncio.Queue
. The queue object has two “faces”: this one is called
async_q
, which provides the async-compatible queue API.
Print a message.
Inside the data_source()
function, a random int is generated,
which is used both as a sleep duration as well as a data value.
Note that the time.sleep()
call is blocking, so this function
must be executed in a thread.
Place the data onto the Janus queue. This shows the other
“face” of the Janus queue: sync_q
, which provides the standard,
blocking Queue
API.
Output:
Got2
off queue Got4
off queue Got4
off queue Got2
off queue Got3
off queue Got4
off queue Got1
off queue Got1
off queue Got0
off queue Got4
off queue Done.
If you can, it’s better to aim for having short executor jobs, and in these cases a queue (for communication) won’t be necessary. This isn’t always possible though, and in such situations the Janus queue can be the most convenient solution to buffer and distribute data between threads and coroutines.
aiohttp
brings all things HTTP to asyncio
, including support for
HTTP clients and servers, as well as websocket support. Let’s jump
straight into code examples, starting with simplicity itself:
“hello world,” next.
The following example demonstrates a minimal web server using aiohttp:
from
aiohttp
import
web
async
def
hello
(
request
)
:
return
web
.
Response
(
text
=
"
Hello, world
"
)
app
=
web
.
Application
(
)
app
.
router
.
add_get
(
'
/
'
,
hello
)
web
.
run_app
(
app
,
port
=
8080
)
An Application
instance is created.
A route is created, with the target coroutine hello()
given as the
handler.
The web application is run.
Observe how there is no mention of loops, tasks, or futures in this code: the
developers of the aiohttp framework have hidden all that away from us,
leaving a very clean API. This is going to be common in most frameworks
that build on top of asyncio
, which has been designed to allow framework
designers to choose only the bits they need, and encapsulate them in
their preferred API.
aiohttp can be used both as a server, as well as a client library, like the very popular (but blocking!) requests library. I wanted to showcase aiohttp by using an example that incorporates both features.
In this case study, we’ll implement a website that does web scraping behind the scenes. The application will scrape two news websites, and combine the headlines into one page of results. Here is the strategy:
A browser client makes a web request to http://localhost:8080/news
Our web server receives the request, and then on the backend fetches HTML data from multiple news websites
Each page’s data is scraped for headlines
The headlines are sorted and formatted into the response HTML that we send back to the browser client
Figure 4-1 shows the output:
Web scraping has become quite difficult nowadays because many websites make
heavy use of JavaScript to load their content. For example, if you try
requests.get('http://edition.cnn.com')
, you’re going to find that the
response contains very little usable data! It has become increasingly
necessary to be able to execute JavaScript locally in order to obtain data,
because many sites use JavaScript to load their actual content. The process
of executing such JavaScript to produce the final, complete HTML output
is called rendering.
To accomplish rendering, we use a neat project called Splash, which describes itself as a “JavaScript rendering service.” It can run in a docker container and provides an API for rendering other sites. Internally, it uses a (JavaScript-capable) WebKit engine to fully load and render a website. This is what we’ll use to obtain website data. Our aiohttp server will call this Splash API to obtain the page data.
To obtain and run the Splash container, run these commands in your shell:
$ docker pull scrapinghub/splash $ docker run --rm -p 8050:8050 scrapinghub/splash
Our server backend will call the Splash API at http://localhost:8050.
from
asyncio
import
get_event_loop
,
gather
from
string
import
Template
from
aiohttp
import
web
,
ClientSession
from
bs4
import
BeautifulSoup
async
def
news
(
request
)
:
sites
=
[
(
'
http://edition.cnn.com
'
,
cnn_articles
)
,
(
'
http://www.aljazeera.com
'
,
aljazeera_articles
)
,
]
loop
=
get_event_loop
(
)
tasks
=
[
loop
.
create_task
(
news_fetch
(
*
s
)
)
for
s
in
sites
]
await
gather
(
*
tasks
)
items
=
{
text
:
(
f
'
<div class=
"
box
{kind}
"
>
'
f
'
<span>
'
f
'
<a href=
"
{href}
"
>
{text}
</a>
'
f
'
</span>
'
f
'
</div>
'
)
for
task
in
tasks
for
href
,
text
,
kind
in
task
.
result
(
)
}
content
=
'
'
.
join
(
items
[
x
]
for
x
in
sorted
(
items
)
)
page
=
Template
(
open
(
'
index.html
'
)
.
read
(
)
)
return
web
.
Response
(
body
=
page
.
safe_substitute
(
body
=
content
)
,
content_type
=
'
text/html
'
,
)
async
def
news_fetch
(
url
,
postprocess
)
:
proxy_url
=
(
f
'
http://localhost:8050/render.html?
'
f
'
url=
{url}
&timeout=60&wait=1
'
)
async
with
ClientSession
(
)
as
session
:
async
with
session
.
get
(
proxy_url
)
as
resp
:
data
=
await
resp
.
read
(
)
data
=
data
.
decode
(
'
utf-8
'
)
return
postprocess
(
url
,
data
)
def
cnn_articles
(
url
,
page_data
)
:
soup
=
BeautifulSoup
(
page_data
,
'
lxml
'
)
def
match
(
tag
)
:
return
(
tag
.
text
and
tag
.
has_attr
(
'
href
'
)
and
tag
[
'
href
'
]
.
startswith
(
'
/
'
)
and
tag
[
'
href
'
]
.
endswith
(
'
.html
'
)
and
tag
.
find
(
class_
=
'
cd__headline-text
'
)
)
headlines
=
soup
.
find_all
(
match
)
return
[
(
url
+
hl
[
'
href
'
]
,
hl
.
text
,
'
cnn
'
)
for
hl
in
headlines
]
def
aljazeera_articles
(
url
,
page_data
)
:
soup
=
BeautifulSoup
(
page_data
,
'
lxml
'
)
def
match
(
tag
)
:
return
(
tag
.
text
and
tag
.
has_attr
(
'
href
'
)
and
tag
[
'
href
'
]
.
startswith
(
'
/news
'
)
and
tag
[
'
href
'
]
.
endswith
(
'
.html
'
)
)
headlines
=
soup
.
find_all
(
match
)
return
[
(
url
+
hl
[
'
href
'
]
,
hl
.
text
,
'
aljazeera
'
)
for
hl
in
headlines
]
app
=
web
.
Application
(
)
app
.
router
.
add_get
(
'
/news
'
,
news
)
web
.
run_app
(
app
,
port
=
8080
)
The news()
function is the handler for the /news URL on our server.
It returns the HTML page showing all the headlines.
Here, we have only two news websites to be scraped: CNN and Al Jazeera.
More could easily be added, but then additional post-processors
would also have to be added, just like the cnn_articles()
and
aljazeera_articles()
functions which are each customized to extract
headline data.
For each news site, we create a task to fetch and process the HTML
page data for their front pages. Note that we unpack the tuple ((*s)
)
since the news_fetch
coroutine function takes both URL and post-process
function as parameters. Each news_fetch()
will return a list of tuples
as headline results, in the form (<article URL>, <article title>)
.
All the tasks are gathered together into a single Future
(gather()
returns a future representing the state of all the tasks
being gathered), and
then we immediately await
for the completion of that future.
This line will suspend until the future completes.
Since all the news_fetch()
tasks are now complete, we collect all
of the results into a dictionary. Note how nested comprehensions are used
to iterate over tasks, and then over the list of tuples returned by
each task. We also use f-strings to substitute data directly, including
even the “kind” of page, which will be used in CSS to color the div
background.
In this dictionary, the key is the headline title,
and the value is an HTML string for a div
that will be displayed in our
result page.
Our web server is going to return HTML. We’re loading HTML data from a local file called index.html. This file is presented in the appendix if you want to recreate the case study yourself.
We substitute the collected headline DIVs into the template and return the page to the browser client. This generates the page shown in Figure 4-1.
Here, inside the news_fetch()
coroutine function, we have a tiny
template for hitting the Splash API (which, for me, is running in a
local docker container on port 8050). Here we demonstrate how aiohttp
can be used as an HTTP client.
The standard way is to create a ClientSession()
instance, and
then use the get()
method on the session instance to perform the REST
call. In the next line, the response data is obtained. Note that
because we’re always operating on coroutines, with async with
and await
,
this coroutine will never block: we’ll be able to handle many thousands of
these requests, even though this operation, i.e., news_fetch()
might be
relatively slow since we’re doing web calls internally.
After the data is obtained, call the post-processing function. Recall
from above that for CNN, it’ll be cnn_articles()
and for Al Jazeera it’ll
be aljazeera_articles()
.
We have space only for a brief look at the post-processing. After getting the page data, we use the Beautiful Soup 4 library for extracting headlines.
The match()
function will return all matching tags (I’ve manually
checked the HTML source of these news websites to figure out which
combination of filters extracts the best tags), and then we return a
list of tuples, matching the format (<article URL>, <article title>)
.
This is the analogous post-processor for Al Jazeera. The match()
condition is slightly different but it is otherwise the same as the CNN one.
Generally, you’ll find that aiohttp
has a quite simple API, and
“stays out of your way” while you develop your applications.
In the next section, we’ll look at using ZeroMQ with asyncio
, which has
the curious effect of making socket programming quite enjoyable.
Programming is a science dressed up as art, because most of us don’t understand the physics of software and it’s rarely, if ever, taught. The physics of software is not algorithms, data structures, languages, and abstractions. These are just tools we make, use, and throw away. The real physics of software is the physics of people. Specifically, it’s about our limitations when it comes to complexity and our desire to work together to solve large problems in pieces. This is the science of programming: make building blocks that people can understand and use easily, and people will work together to solve the very largest problems.
Pieter Hintjens, ZeroMQ: Messaging for Many Applications
ZeroMQ (or even ØMQ!) is a popular
language-agnostic library for networking applications: it provides
“smart” sockets. When you create ZeroMQ sockets in code, they resemble
regular sockets, with recognizable method names like recv()
and
send()
and so on, but internally these sockets handle some of the
more annoying and tedious tasks required for working with conventional
sockets.
One of these features is management of message-passing, so you don’t have to invent your own protocol and count bytes on the wire to figure out when all the bytes for a particular message have arrived—you simply send whatever you consider to be a “message,” and the whole thing arrives on the other end intact!
Another great feature is automatic reconnection logic. If the server goes down and comes back up later, the client ØMQ socket will automatically reconnect. And even better, messages your code sends into the socket will be buffered during the disconnected period, so they will all still be sent out when the server returns. These are some of the reasons why ØMQ is sometimes referred to as brokerless3 messaging: it provides some of the features of message broker software directly in the socket objects themselves.
ØMQ sockets are already implemented as asynchronous internally (so they can maintain many thousands of concurrent connections, even when used in threaded code), but this is hidden from us behind the ØMQ API; nevertheless, support for Asyncio has been added to the PyZMQ Python bindings for the ØMQ library, and in this section we’re going to look at several examples of how these smart sockets might be incorporated into your Python applications.
Here’s a head-scratcher: if ØMQ provides sockets that are already
asynchronous, in a way that is usable with threading, what is the point
of using ØMQ with asyncio
? The answer is: cleaner code.
To demonstrate, let’s look at a tiny case study where you use multiple ØMQ sockets in the same application. First we’ll show the blocking version (this example is taken from the zguide, the official guide for ØMQ):
# poller.py
import
zmq
context
=
zmq
.
Context
(
)
receiver
=
context
.
socket
(
zmq
.
PULL
)
receiver
.
connect
(
"
tcp://localhost:5557
"
)
subscriber
=
context
.
socket
(
zmq
.
SUB
)
subscriber
.
connect
(
"
tcp://localhost:5556
"
)
subscriber
.
setsockopt_string
(
zmq
.
SUBSCRIBE
,
'
'
)
poller
=
zmq
.
Poller
(
)
poller
.
register
(
receiver
,
zmq
.
POLLIN
)
poller
.
register
(
subscriber
,
zmq
.
POLLIN
)
while
True
:
try
:
socks
=
dict
(
poller
.
poll
(
)
)
except
KeyboardInterrupt
:
break
if
receiver
in
socks
:
message
=
receiver
.
recv_json
(
)
(
f
'
Via PULL:
{message}
'
)
if
subscriber
in
socks
:
message
=
subscriber
.
recv_json
(
)
(
f
'
Via SUB:
{message}
'
)
ØMQ sockets have types! This is a PULL
socket. You can think
of it as a “receive-only” kind of socket, that will be fed by some other
“send-only” socket which will be a PUSH
type.
The SUB
socket type is another kind of “receive-only” socket, and
will be fed a PUB
type socket which is send-only.
If you need to move data between multiple sockets in a threaded ØMQ application, you’re
going to need a poller. This is because these sockets are not thread-safe,
so you cannot recv()
on different sockets in different threads.4
It works similar to the select()
system call. The poller will
unblock when there is data ready to be received on one of the registered
sockets, and then it’s up to you to pull the data off and do something
with it. The big if
block is how you have to detect the correct socket.
Using a poller loop plus an explicit socket-selection block makes
it look a little clunky. Another option might be to .recv()
on each
socket in different threads—but now you have to deal with lots of
potential problems around thread safety. For instance: ØMQ sockets
are not threadsafe, and so the same socket must not be used from different
threads. The code shown above is much safer because you don’t have to
worry about any thread safety problems.
Anyhow, before we continue the discussion, let’s show the server code, and a little output:
# poller_srv.py
import
zmq
,
itertools
,
time
context
=
zmq
.
Context
()
pusher
=
context
.
socket
(
zmq
.
PUSH
)
pusher
.
bind
(
"tcp://*:5557"
)
publisher
=
context
.
socket
(
zmq
.
PUB
)
publisher
.
bind
(
"tcp://*:5556"
)
for
i
in
itertools
.
count
():
time
.
sleep
(
1
)
pusher
.
send_json
(
i
)
publisher
.
send_json
(
i
)
The server code is not important for the discussion, but briefly:
there’s a PUSH socket and a PUB socket, as we said earlier,
and a loop inside which data gets sent to both sockets every second. Here’s
some output from poller.py
(Note: both programs must be running):
$
python poller.py
Via PULL: 0
Via SUB: 0
Via PULL: 1
Via SUB: 1
Via PULL: 2
Via SUB: 2
Via PULL: 3
Via SUB: 3
The code works. But our interest here is not whether the code runs,
but rather whether asyncio
has anything to offer for the structure of
the poller.py
code. The key thing to understand is that our
asyncio
code is going to run in a single thread, which means that
it’s fine to handle different sockets in different coroutines—and
indeed, this is exactly what we’ll do.
Of course,
someone
had to do the hard work to add support for coroutines into pyzmq
(the Python client library for ØMQ) itself for this to work, so it
wasn’t free! But now that the hard work is done, we can improve on
our “traditional” code structure quite a lot:
For the code examples that follow, it is necessary to use
pyzmq
>= 17.0.0. At the time of writing, version 17 wasn’t released
yet, so if necessary you will have to install the latest beta of
pyzmq with a major version of 17.
# poller_aio.py
import
asyncio
import
zmq
from
zmq
.
asyncio
import
Context
context
=
Context
(
)
async
def
do_receiver
(
)
:
receiver
=
context
.
socket
(
zmq
.
PULL
)
receiver
.
connect
(
"
tcp://localhost:5557
"
)
while
True
:
message
=
await
receiver
.
recv_json
(
)
(
f
'
Via PULL:
{message}
'
)
async
def
do_subscriber
(
)
:
subscriber
=
context
.
socket
(
zmq
.
SUB
)
subscriber
.
connect
(
"
tcp://localhost:5556
"
)
subscriber
.
setsockopt_string
(
zmq
.
SUBSCRIBE
,
'
'
)
while
True
:
message
=
await
subscriber
.
recv_json
(
)
(
f
'
Via SUB:
{message}
'
)
loop
=
asyncio
.
get_event_loop
(
)
loop
.
create_task
(
do_receiver
(
)
)
loop
.
create_task
(
do_subscriber
(
)
)
loop
.
run_forever
(
)
This code sample does the same as before, except that now we’re taking advantage of coroutines to restructure everything. Now we can deal with each socket in isolation. We’ve created two coroutine functions, one for each socket, and this one is for the PULL socket.
We’re using the asyncio
support in pyzmq
, which means that all
send()
and recv()
calls must use the await
keyword. The Poller
no
longer appears anywhere, because it’s been integrated into the asyncio
event loop itself.
This is the handler for the SUB
socket. The structure is very similar
to the PULL
socket’s handler, but that need not have been the case. If
more complex logic had been required, we’d have been able to easily add
it here, fully encapsulated within the SUB
-handler code only.
Again: the asyncio
-compatible sockets require the await
keyword to
send and receive.
The extra lines required to start the asyncio
event loop and
create the tasks for each socket. I’ve cut
a few corners here, and omitted all error-handling and cleanup, because
I want to emphasize the impact on code layout further up.
The output is the same as before so it won’t be shown.
The use of coroutines has, in my opinion, a staggeringly positive effect on the code layout in these examples. In real production code with lots of ØMQ sockets, the coroutine handlers for each might as well even be in separate files, providing more opportunities for better code structure. And even for programs with a single read-write socket, it is very easy to use separate coroutines for read and write, if necessary.
The improved code looks a lot like threaded code, and indeed, for the
specific example shown above, the same refactor will work for threading:
run blocking do_receiver()
and do_subscriber()
functions in
separate threads. But do you really want to deal with even the potential
for race conditions, especially as your application grows in features
and complexity over time?
There is lots to explore here, and as I said before, these magic sockets are a lot of fun to play with! In the next case study we look at a more practical use of ØMQ than offered by the one above.
In the modern, containerized, microservice-based deployment practices of
today, some things that used to be trivial, such as monitoring your apps’
CPU and memory usage, have become somewhat more complicated than just
running top
. To fill this void, several commercial products have
emerged over the last few years, but it remains the case that cost can be
prohibitive for small startup teams and hobbyists.
In this case study we’ll exploit ØMQ and asyncio
to build a
toy prototype for distributed application monitoring. Our design has
three parts:
This layer contains all our applications. Examples might be a “customers” microservice, a “bookings” microservice, an “emailer” microservice, and so on. We will add a ØMQ “transmitting” socket to each of our applications. This socket will send performance metrics to a central server.
The central server will expose a ØMQ socket to collect the data from all the running application instances. The server will also serve a web page to show performance graphs over time, and our server will live-stream the data as it comes in!
This is the web page being served. We will display the collected data in a set of charts, and the charts will live-update in real time. To simplify the code samples, we will use the convenient Smoothie Charts JavaScript library which provides all the necessary client-side features.
import
argparse
from
asyncio
import
get_event_loop
,
gather
,
sleep
,
CancelledError
from
random
import
randint
,
uniform
from
datetime
import
datetime
as
dt
from
datetime
import
timezone
as
tz
from
contextlib
import
suppress
import
zmq
,
zmq
.
asyncio
,
psutil
from
signal
import
SIGINT
# zmq.asyncio.install()
ctx
=
zmq
.
asyncio
.
Context
(
)
async
def
stats_reporter
(
color
:
str
)
:
p
=
psutil
.
Process
(
)
sock
=
ctx
.
socket
(
zmq
.
PUB
)
sock
.
setsockopt
(
zmq
.
LINGER
,
1
)
sock
.
connect
(
'
tcp://localhost:5555
'
)
with
suppress
(
CancelledError
)
:
while
True
:
await
sock
.
send_json
(
dict
(
color
=
color
,
timestamp
=
dt
.
now
(
tz
=
tz
.
utc
)
.
isoformat
(
)
,
cpu
=
p
.
cpu_percent
(
)
,
mem
=
p
.
memory_full_info
(
)
.
rss
/
1024
/
1024
)
)
await
sleep
(
1
)
sock
.
close
(
)
async
def
main
(
args
)
:
leak
=
[
]
with
suppress
(
CancelledError
)
:
while
True
:
sum
(
range
(
randint
(
1
_000
,
10
_000_000
)
)
)
await
sleep
(
uniform
(
0
,
1
)
)
leak
+
=
[
0
]
*
args
.
leak
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--color
'
,
type
=
str
)
parser
.
add_argument
(
'
--leak
'
,
type
=
int
,
default
=
0
)
args
=
parser
.
parse_args
(
)
loop
=
get_event_loop
(
)
loop
.
add_signal_handler
(
SIGINT
,
loop
.
call_soon
,
loop
.
stop
)
tasks
=
gather
(
main
(
args
)
,
stats_reporter
(
args
.
color
)
)
loop
.
run_forever
(
)
(
'
Leaving...
'
)
for
t
in
asyncio
.
Task
.
all_tasks
(
)
:
t
.
cancel
(
)
loop
.
run_until_complete
(
tasks
)
ctx
.
term
(
)
In versions of pyzmq
below 17.0.0, it was necessary to use this
explicit zmq.asyncio.install()
command to enable Asyncio support. At the
time of writing, version 17 is currently in beta but hopefully it will have
a stable release by the time you read this.
This coroutine function will run as a long-lived coroutine, continually sending out data to the server process.
Create a ØMQ socket! There are different flavors of socket. This one is
a PUB
socket type, which allows one-way messages to be sent to another
ØMQ socket. This socket has—as the ØMQ guide says—superpowers. It will
automatically handle all reconnect and buffering logic for us.
Connect to the server.
Our shutdown sequence is driven by KeyboardInterrupt
, further down.
When the signal is received, we’ll cancel all the tasks. Here we handle the
raised CancelledError
with the handy suppress()
context manager from
the contextlib
standard library module.
Iterate forever, sending out data to the server.
Since ØMQ knows how to work with complete messages, and not just chunks
off a bytestream, it opens the door to a bunch of useful wrappers around the
usual sock.send()
idiom: here, we use one of those helper methods,
send_json()
, which will automatically serialize the argument into JSON.
This allows us to use a dict()
directly.
A reliable way to transmit datetime information is via the ISO 8601 format. This is especially true if you have to pass datetime data between software written in different languages, since the vast majority of language implementations will be able to work with this standard.
To end up here, we must have received the CancelledError
exception
resulting from task cancellation. The ØMQ socket must be closed to allow
program shutdown.
The main()
function symbolizes the actual microservice application.
Fake work is produced with this sum over random numbers, just to give us
some non-zero data to view in the visualization layer a bit later.
We’re going to create multiple instances of this application, so it
would be convenient to be able to distinguish between them (later, in the
graphs) with a --color
parameter.
When a SIGINT
signal is received (e.g., pressing Ctrl-C),
schedule a call to stop the loop.
Create and gather tasks for each of the coroutine functions.
Having received the shutdown signal, cancel the tasks. This will raise
a CancelledError
inside all of the coroutines represented in the tasks
group. After cancellation, it is still necessary to run the tasks to
completion, by allowing them the chance to handle the cancellation
appropriately. For example, we must close the ØMQ socket in order to
shut down at all.
Finally, the ØMQ context can be terminated.
The primary point of interest is the stats_reporter()
function. This
is what streams out metrics data (collected by the useful psutil
library). The rest of the code can be assumed to be a typical microservice
application.
Now we look at the server code where all the data will be collected and served to a web client.
# metric-server.py
import
asyncio
from
contextlib
import
suppress
import
zmq
import
zmq
.
asyncio
import
aiohttp
from
aiohttp
import
web
from
aiohttp_sse
import
sse_response
from
weakref
import
WeakSet
import
json
# zmq.asyncio.install()
ctx
=
zmq
.
asyncio
.
Context
(
)
connections
=
WeakSet
(
)
async
def
collector
(
)
:
sock
=
ctx
.
socket
(
zmq
.
SUB
)
sock
.
setsockopt_string
(
zmq
.
SUBSCRIBE
,
'
'
)
sock
.
bind
(
'
tcp://*:5555
'
)
with
suppress
(
asyncio
.
CancelledError
)
:
while
True
:
data
=
await
sock
.
recv_json
(
)
(
data
)
for
q
in
connections
:
await
q
.
put
(
data
)
sock
.
close
(
)
async
def
feed
(
request
)
:
queue
=
asyncio
.
Queue
(
)
connections
.
add
(
queue
)
with
suppress
(
asyncio
.
CancelledError
)
:
async
with
sse_response
(
request
)
as
resp
:
while
True
:
data
=
await
queue
.
get
(
)
(
'
sending data:
'
,
data
)
resp
.
send
(
json
.
dumps
(
data
)
)
return
resp
async
def
index
(
request
)
:
return
aiohttp
.
web
.
FileResponse
(
'
./charts.html
'
)
async
def
start_collector
(
app
)
:
app
[
'
collector
'
]
=
app
.
loop
.
create_task
(
collector
(
)
)
async
def
stop_collector
(
app
)
:
(
'
Stopping collector...
'
)
app
[
'
collector
'
]
.
cancel
(
)
await
app
[
'
collector
'
]
ctx
.
term
(
)
if
__name__
==
'
__main__
'
:
app
=
web
.
Application
(
)
app
.
router
.
add_route
(
'
GET
'
,
'
/
'
,
index
)
app
.
router
.
add_route
(
'
GET
'
,
'
/feed
'
,
feed
)
app
.
on_startup
.
append
(
start_collector
)
app
.
on_cleanup
.
append
(
stop_collector
)
web
.
run_app
(
app
,
host
=
'
127.0.0.1
'
,
port
=
8088
)
One half of this program will receive data from other applications, and
the other half will provide data to browser clients via server-sent events (SSE).
We use a WeakSet()
to keep track of all the currently connected web clients.
Each connected client will have an associated Queue()
instance, so this
connections
identifier is really a set of queues.
Recall that in the application layer, we used a zmq.PUB
socket; here
in the collection layer we use its partner, the zmq.SUB
socket type. This
ØMQ socket can only receive, not send.
For the zmq.SUB
socket type, it is required to provide a subscription
name, but for our goals we’ll just take everything that comes in, hence
the empty topic name.
Here we bind the zmq.SUB
socket. Think about that for second! In
“pubsub” configurations you usually have to make the pub end the server
(bind()
) and the sub end the client (connect()
). ØMQ is different:
either end can be the server. For our use-case this is important, because
each of our application-layer instances will be connecting to the same
collection server domain name and not the other way round.
The support for asyncio
in pyzmq
allows us to await
on data from
our connected apps. And not only that, but the incoming data will be
automatically deserialized from JSON (yes, this means data
is a dict()
).
Recall that our connections
set holds a queue for every connected
web client? Now that data has been received, it’s time to send it to all
the clients: the data is placed onto each queue.
The feed()
coroutine function will create coroutines for each
connected web client. Internally, server-sent events are used to
push data to the web clients.
As described earlier, each web client will have its own queue
instance,
in order to receive data from the collector()
coroutine. The queue
instance is added to the connections
set, but because connections
is
a weak set, the entry will automatically be removed from
connections
when the queue goes out of scope, i.e., when a web client
disconnects. Weakrefs are really great for simplifying these kinds of bookkeeping tasks.
The aiohttp_sse
package provides the sse_response()
context
manager. This gives us a scope inside which to push data to the web client.
We remain connected to the web client, and wait for data on this specific client’s queue.
As soon as the data comes in (inside collector()
) it will be sent
to the connected web client. Note that we reserialize the data dict
here.
An optimization to the code shown here would be to avoid deserializing
JSON in collector()
, and instead use sock.recv_string
to avoid
the serialization round-trip. Of course, in a real scenario you might want
to deserialize in the collector anyway, and perform some validation on the
data before sending to the browser client. So many choices!
The index()
endpoint is the primary page-load, and here we serve
a static file called charts.html.
The aiohttp library provides facilities for you to hook in additional
long-lived coroutines you might need. With the collector()
coroutine,
we have exactly that situation, so we create a startup coroutine
start_collector()
, and a shutdown coroutine. These will be called during
specific phases of aiohttp’s startup and shutdown sequence. Note that
we add the collector task to the app
itself, which implements a mapping
protocol so that you can use it like a dict
.
Here you can see that we obtain our collector()
coroutine off the
app
identifier and call cancel()
on that.
Finally, you can see where the custom startup and shutdown coroutines
are hooked in: the app
instance provides hooks to which your custom
coroutines may be appended.
All that remains is the visualization layer. We’re using the
Smoothie Charts library to generate scrolling
charts, and the complete HTML for our main (and only!) web page,
charts.html
, which is provided in the Appendix in its entirety. There is
too much HTML, CSS, and JavaScript to present in this section, but I did
want to highlight a few points about how the server-sent events are handled
in JavaScript on the browser client.
<snip
>
var evtSource = new EventSource("/feed");
evtSource.onmessage = function(e) { var obj = JSON.parse(e.data);
if (!(obj.color in cpu)) { add_timeseries(cpu, cpu_chart, obj.color); } if (!(obj.color in mem)) { add_timeseries(mem, mem_chart, obj.color); } cpu[obj.color].append( Date.parse(obj.timestamp), obj.cpu);
mem[obj.color].append( Date.parse(obj.timestamp), obj.mem); };
<snip
>
Create a new EventSource
instance on the /feed URL. The browser
will connect to /feed on our server, metric-server.py
. Note that
the browser will automatically try to reconnect if the connection is lost.
Server-sent events are often overlooked, but there are many situations where
the simplicity of SSE might be preferred over websockets.
The onmessage()
event will fire every time the server sends data. Here
the data is parsed as JSON.
Recall that the cpu
identifier is a mapping of color to a
TimeSeries()
instance. Here, we obtain that time series and append data
to it. We also obtain the timestamp and parse it to get the correct
format required by the chart.
Now we get to run the code. To get the whole show moving, a bunch of command-line instructions are required:
$
python metric-server.py========
Running on http://127.0.0.1:8088========
(
Press CTRL+C to quit)
This starts our collector. The next step is to start up all the microservice instances. These will send their CPU and memory-usage metrics to the collector. Each will be identified by a different color, which is specified on the command line:
$
python backend-app.py --color red&
$
python backend-app.py --color blue --leak10000
&
$
python backend-app.py --color green --leak100000
&
Figure 4-2 shows our final product! You’ll have to take my word for it that the graphs really do animate. You’ll notice in the listing above that I added some memory leakage to blue, and a lot to green. I even had to restart the green service a few times to prevent it from climbing over 100 MB.
What is especially interesting about this project is this: any of
the running instances in any part of this stack can be restarted, and
no reconnect-handling code is necessary! The ØMQ sockets, along
with the EventSource
JavaScript instance in the browser, magically reconnect
and pick up where they left off.
In the next section we turn our attention to databases, and how asyncio
might be used to design a system for cache invalidation.
The asyncpg
library
provides client access to the PostgreSQL database, but differentiates itself
from other
asyncio-compatible Postgres client libraries with an
emphasis on speed. asyncpg
is authored by
Yury Selivanov, one of the core asyncio
Python developers, who is also the author of the uvloop project. In
addition, asyncpg has no third-party dependencies, although
Cython is required if you’re installing from source.
asyncpg
achieves its speed by working directly against the
PostgreSQL binary protocol, and other advantages to this low-level approach
include support for
prepared statements
and scrollable cursors.
We’ll be looking at a case study using asyncpg
for cache
invalidation, but before that it will be useful to get a basic understanding
of the API asyncpg
provides. For all of the code in this section, we’ll
need a running instance of PostgreSQL, and this is most easily done
with Docker:
$
docker run -d --rm -p 55432:5432 postgres
Note that I’ve exposed port 55432 rather than the default, 5432, just in
case you already have a running instance of the database on the default
port. The code below gives a brief demonstration of how to use asyncpg
to talk to PostgreSQL.
import
asyncio
import
asyncpg
import
datetime
from
util
import
Database
async
def
main
(
)
:
async
with
Database
(
'
test
'
,
owner
=
True
)
as
conn
:
await
demo
(
conn
)
async
def
demo
(
conn
:
asyncpg
.
Connection
)
:
await
conn
.
execute
(
'''
CREATE TABLE users(
id serial PRIMARY KEY,
name text,
dob date
)
'''
)
pk
=
await
conn
.
fetchval
(
'
INSERT INTO users(name, dob) VALUES($1, $2)
'
'
RETURNING id
'
,
'
Bob
'
,
datetime
.
date
(
1984
,
3
,
1
)
)
async
def
get_row
(
)
:
return
await
conn
.
fetchrow
(
'
SELECT * FROM users WHERE name = $1
'
,
'
Bob
'
)
(
'
After INSERT:
'
,
await
get_row
(
)
)
await
conn
.
execute
(
'
UPDATE users SET dob = $1 WHERE id=1
'
,
datetime
.
date
(
1985
,
3
,
1
)
)
(
'
After UPDATE:
'
,
await
get_row
(
)
)
await
conn
.
execute
(
'
DELETE FROM users WHERE id=1
'
)
(
'
After DELETE:
'
,
await
get_row
(
)
)
if
__name__
==
'
__main__
'
:
loop
=
asyncio
.
get_event_loop
(
)
loop
.
run_until_complete
(
main
(
)
)
I’ve hidden some boilerplate away in a tiny util
module to
simplify things and keep the core message.
The Database
class gives us a context manager that will create a
new database for us—in this case named test
—and will destroy that
database when the context manager exits. This turns out to be very
useful when experimenting with ideas in code. Because no state is carried
over between experiments, you start from a clean database every time.
Note that this is an async with
context manager; we’ll see more about
that later, but for now, the focal area of this demo is what happens
inside the demo()
coroutine.
The Database
context manager has provided us with a Connection
instance, which is immediately used to create a new table, users
.
Insert a new record. While we could have used .execute()
to do the insertion, the benefit of using fetchval()
is that we can obtain the id
of the newly inserted record, which
we’ll store in the pk
identifier.
Note: We use parameters ($1
and $2
) for passing data to the SQL
query. Never use string interpolation or concatenation to build queries,
as this is a security risk!
In the remainder of this demo, we’re going to be manipulating data in our new table, so here we make a new utility coroutine function that fetches our record in the table. This will be called several times.
When retrieving data, it is far more useful to use the fetch
-based
methods, because these will return Record
objects. asyncpg
will
automatically cast datatypes to the most appropriate types for Python.
We immediately use the get_row()
helper to display our newly inserted
record.
We modify data using the UPDATE
command for SQL. It’s a tiny
modification: the year-value in the date of birth is changed by one year. As
before, this is performed with the connection’s execute()
method. The
remainder of the code demo follows the same structure as seen so far, and
a DELETE
, followed by another print()
, happens a few lines down.
This produces the following output:
$
python asyncpg-basic.py After INSERT: <Recordid
=
1
name
=
'Bob'
dob
=
datetime.date(
1984, 3, 1)
> After UPDATE: <Recordid
=
1
name
=
'Bob'
dob
=
datetime.date(
1985, 3, 1)
> After DELETE: None
In the output, note how the date value we retrieve in our Record
object has been converted to a Python date
object: asyncpg has
automatically converted the datatype from the SQL type to its Python
counterpart. There is a large table of
type
conversions presented in the asyncpg documentation that describes
all the type mappings that are already built into asyncpg.
The code above is very simple; perhaps even crudely so, if you’re used to
the convenience of object-relational mappers (ORMs) like SQLAlchemy or the
Django web framework’s built-in ORM. At the end of this chapter I mention
several third-party libraries that provide access to ORMs or ORM-like
features for asyncpg
.
Let’s take a quick look at my boilerplate Database
object in the utils
module; you may find it useful to make something similar for your own
experiments:
# util.py
import
argparse
,
asyncio
,
asyncpg
from
asyncpg
.
pool
import
Pool
DSN
=
'
postgresql://
{user}
@
{host}
:
{port}
'
DSN_DB
=
DSN
+
'
/
{name}
'
CREATE_DB
=
'
CREATE DATABASE
{name}
'
DROP_DB
=
'
DROP DATABASE
{name}
'
class
Database
:
def
__init__
(
self
,
name
,
owner
=
False
,
*
*
kwargs
)
:
self
.
params
=
dict
(
user
=
'
postgres
'
,
host
=
'
localhost
'
,
port
=
55432
,
name
=
name
)
self
.
params
.
update
(
kwargs
)
self
.
pool
:
Pool
=
None
self
.
owner
=
owner
self
.
listeners
=
[
]
async
def
connect
(
self
)
-
>
Pool
:
if
self
.
owner
:
await
self
.
server_command
(
CREATE_DB
.
format
(
*
*
self
.
params
)
)
self
.
pool
=
await
asyncpg
.
create_pool
(
DSN_DB
.
format
(
*
*
self
.
params
)
)
return
self
.
pool
async
def
disconnect
(
self
)
:
"""Destroy the database"""
if
self
.
pool
:
releases
=
[
self
.
pool
.
release
(
conn
)
for
conn
in
self
.
listeners
]
await
asyncio
.
gather
(
*
releases
)
await
self
.
pool
.
close
(
)
if
self
.
owner
:
await
self
.
server_command
(
DROP_DB
.
format
(
*
*
self
.
params
)
)
async
def
__aenter__
(
self
)
-
>
Pool
:
return
await
self
.
connect
(
)
async
def
__aexit__
(
self
,
*
exc
)
:
await
self
.
disconnect
(
)
async
def
server_command
(
self
,
cmd
)
:
conn
=
await
asyncpg
.
connect
(
DSN
.
format
(
*
*
self
.
params
)
)
await
conn
.
execute
(
cmd
)
await
conn
.
close
(
)
async
def
add_listener
(
self
,
channel
,
callback
)
:
conn
:
asyncpg
.
Connection
=
await
self
.
pool
.
acquire
(
)
await
conn
.
add_listener
(
channel
,
callback
)
self
.
listeners
.
append
(
conn
)
if
__name__
==
'
__main__
'
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--cmd
'
,
choices
=
[
'
create
'
,
'
drop
'
]
)
parser
.
add_argument
(
'
--name
'
,
type
=
str
)
args
=
parser
.
parse_args
(
)
loop
=
asyncio
.
get_event_loop
(
)
d
=
Database
(
args
.
name
,
owner
=
True
)
if
args
.
cmd
==
'
create
'
:
loop
.
run_until_complete
(
d
.
connect
(
)
)
elif
args
.
cmd
==
'
drop
'
:
loop
.
run_until_complete
(
d
.
disconnect
(
)
)
else
:
parser
.
print_help
(
)
The Database
class is just a fancy context manager for creating
and deleting a database from a PostgreSQL instance. The database name
is passed into the constructor.
(Note: The sequence of callouts in the code is intentionally different
from this list.) This is an asynchronous context manager. Instead of the usual
__enter__()
and __exit__()
methods, we have their
__aenter__()
and __aexit__()
counterparts. Here, in the entering side,
we’ll create the new database and return a connection to that new
database.
server_command()
is another helper method defined a few lines down.
We use it to run the command for creating our new database.
A connection is made to the newly created database. Note that I’ve hard-coded several details about the connection: this is intentional, as I want to keep the code samples small. You could easily generalize this by making fields like the username, hostname, and port.
In the exiting side of the context manager, we close the connection and…
…destroy the database.
For completeness, this is our utility method for running commands against the PostgreSQL server itself. It creates a connection for that purpose, runs the given command, and exits.
This is a surprise, and will be featured in the upcoming case study!
In point 8 above, we create a dedicated connection for each channel we want to listen on. This is very expensive since it means that a PostgreSQL worker will be completely tied up for every channel being listened to. A much better design would be to use one connection for multiple channels. Once you have worked through this example, try to modify the code to use a single connection for multiple channel listeners!
Now that we have an understanding of the basic building blocks of asyncpg, we can explore on a really fun case study: using PostgreSQL’s built-in support for sending event notifications to perform cache invalidation!
There are two hard things in computer science: cache invalidation, naming things, and off-by-one errors.
Phil Karlton
It is common in web services and web applications that the persistence layer, i.e., the backing database (DB), becomes the performance bottleneck sooner than any other part of the stack. The application layer can usually be scaled horizontally, i.e., run more instances, whereas it’s trickier to do that with a database.
This is why it’s common practice to look at design options that can limit excessive interaction with the database. The most common option is to use caching to “remember” previously fetched database results and replay them when asked, thus avoiding subsequent calls to the DB for the same information.
However: what happens if one of your app instances writes new data to the database while another app instance is still returning the old, stale data from its internal cache? This is a classic cache invalidation problem, and these can be very difficult to resolve in a robust way.
Our attack strategy is as follows:
Each app instance has an in-memory cache of DB queries.
When one writes new data to the database, the database alerts all of the connected app instances of the new data.
Each app instance then updates its internal cache accordingly.
This case study will highlight how PostgreSQL’s built-in support for event updates, via the LISTEN and NOTIFY commands, can simply tell us when its data has changed.
asyncpg
already has support for the LISTEN/NOTIFY API.
This feature of PostgreSQL allows your app to subscribe to events on a
named channel and also post events to named channels. It’s almost
like PostgreSQL can become a lighter version of
RabbitMQ or
ActiveMQ!
This case study has more moving parts than usual, and that makes it awkward to present in the usual linear format. Instead, we’ll begin by looking at the final product, and work backwards towards the underlying implementation.
Our app provides a JSON-based API server for managing the favorite
dishes of patrons at our robotic restaurant. The backing database will
have only one table, patron
, with only two fields: name
and fav_dish
.
Our API will allow the usual set of four operations: create, read,
update, and delete (CRUD).
Here is what it looks like to interact with our API using curl
, and
to create a new entry in our database:
curl -d'{"name": "Carol", "fav_dish": "SPAM Bruschetta"}'
-H
"Content-Type: application/json"
-X POST
http://localhost:8000/patron
Output:
{
"msg"
:"ok"
,"id"
:37}
The -d
parameter is for data,5 the -H
is for the HTTP headers, the -X
is for the HTTP request method (alternatives are GET
, DELETE
and PUT
,
and a few others), and the URL is for our API server. We’ll get to the code
for that shortly.
In the output, we see that the creation was “ok,” and the id
being returned
is the primary key of the new record in the database.
In these next few shell snippets, we run through the other three operations: read, update, and delete.
curl -X GET http://localhost:8000/patron/37
Output:
{
"id"
:37,"name"
:"Carol"
,"fav_dish"
:"SPAM Bruschetta"
}
Reading the data is pretty straightforward, and note that the id
of
the desired record must be supplied in the URL.
curl -d'{"name": "Eric", "fav_dish": "SPAM Bruschetta"}'
-H
"Content-Type: application/json"
-X PUT
http://localhost:8000/patron/37 curl -X GET http://localhost:8000/patron/37
Output:
{
"msg"
:"ok"
}
{
"id"
:37,"name"
:"Eric"
,"fav_dish"
:"SPAM Bruschetta"
}
Updating a resource, as shown above, is very similar to creating one, except for two key differences:
The HTTP request method (-X
) is PUT
, not POST
.
The URL now requires the id
field.
We have also issued another GET
immediately after, to verify that the
change was applied. Finally, deletion:
curl -X DELETE http://localhost:8000/patron/37 curl -X GET http://localhost:8000/patron/37
Output:
{
"msg"
:"ok"
}
null
This example above also shows that null
is returned when you try to
get a record that doesn’t exist.
So far this all looks quite ordinary; but our objective is not only to make a CRUD API—we want to look at cache invalidation, so let’s turn our attention toward the cache. Now that we have a basic understanding of our app’s API, we can look at the application logs to see timing data for each request: this will tell us which requests are cached, and which hit the DB.
When the server is first started up, the cache is empty; it’s a memory
cache after all. We’re going to start up our server, and then in a
separate shell run two GET
requests in quick succession:
curl -X GET http://localhost:8000/patron/29 curl -X GET http://localhost:8000/patron/29
Output:
{
"id"
:29,"name"
:"John Cleese"
,"fav_dish"
:"Gravy on Toast"
}
{
"id"
:29,"name"
:"John Cleese"
,"fav_dish"
:"Gravy on Toast"
}
We expect that the first time we retrieve our record above, there’s going to be a cache miss, and the second time, a hit. We can see evidence of this in the log for the API server itself:
$
python
sanic_demo.py
2017-09-29
16:20:33
-
(
sanic
)
[
DEBUG
]
:
▄▄▄▄▄
▀▀▀██████▄▄▄
_______________
▄▄▄▄▄
█████████▄
/
▀▀▀▀█████▌
▀▐▄
▀▐█
|
Gotta
go
fast!
|
▀▀█████▄▄
▀██████▄██
|
_________________/
▀▄▄▄▄▄
▀▀█▄▀█════█▀
|
/
▀▀▀▄
▀▀███
▀
▄▄
▄███▀▀██▄████████▄
▄▀▀▀▀▀▀█▌
██▀▄▄▄██▀▄███▀
▀▀████
▄██
▄▀▀▀▄██▄▀▀▌████▒▒▒▒▒▒███
▌▄▄▀
▌
▐▀████▐███▒▒▒▒▒▐██▌
▀▄▄▄▄▀
▀▀████▒▒▒▒▄██▀
▀▀█████████▀
▄▄██▀██████▀█
▄██▀
▀▀▀
█
▄█
▐▌
▄▄▄▄█▌
▀█▄▄▄▄▀▀▄
▌
▐
▀▀▄▄▄▀
▀▀▄▄▀
2017-09-29
16:20:33
(
sanic
)
:
Goin
'
Fast
@
http://0.0.0.0:8000
2017-09-29
16:20:33
(
sanic
)
:
Starting
worker
[
10366
]
2017-09-29
16:25:27
(
perf
)
:
id
=
37
Cache
miss
2017-09-29
16:25:27
(
perf
)
:
get
Elapsed:
4.26
ms
2017-09-29
16:25:27
(
perf
)
:
get
Elapsed:
0.04
ms
Everything up to this line is the default sanic
startup log
message.
As described, the first GET
results in a cache miss because the
server has only just started.
This is from our first curl -X GET
. I’ve added some timing
functionality to the API endpoints. Here
the handler for the GET
request took ~4 ms.
The second GET
returns data from the cache, and the much faster
timing data, ~100x, indicates that the data is now being returned from
the cache.
So far, nothing unusual. Many web apps use caching in this way.
Let’s start up a second app instance on port 8001 (the first instance was on port 8000):
$
python sanic_demo.py --port 8001 <snip> 2017-10-02 08:09:56 -(
sanic)
: Goin'
Fast @ http://0.0.0.0:8001 2017-10-02 08:09:56 -(
sanic)
: Starting worker[
385]
Both instances, of course, connect to the same database. Now, with
both API server instances running, let’s modify the data for patron
John who lacks, clearly, sufficient Spam in their diet. We do this by
performing an UPDATE
against, say, the first app instance at port 8000:
curl -d'{"name": "John Cleese", "fav_dish": "SPAM on toast"}'
-H
"Content-Type: application/json"
-X PUT
http://localhost:8000/patron/29
{
"msg"
:"ok"
}
Immediately after this update event on only one app instance, both API servers, 8000 and 8001, report the following event in their logs:
2017-10-02 08:35:49 -(
perf)[
INFO]
: Got DB event:{
"table"
:"patron"
,"id"
: 29,"type"
:"UPDATE"
,"data"
:{
"old"
:{
"id"
: 29,"name"
:"John Cleese"
,"fav_dish"
:"Gravy on Toast"
}
,"new"
:{
"id"
: 29,"name"
:"John Cleese"
,"fav_dish"
:"SPAM on toast"
}
,"diff"
:{
"fav_dish"
:"SPAM on toast"
}
}
}
The database has reported the update event back to the app instances! We haven’t even done any requests against app instance 8001 yet. Does this mean that the new data is already cached there?
To check, we now do a GET
on the second server at 8001, and the timing
info shows that we do indeed obtain the data directly from the cache, even
though no GET
was previously called on this app instance:
curl -X GET http://localhost:8001/patron/29{
"id"
:29,"name"
:"John Cleese"
,"fav_dish"
:"SPAM on toast"
}
2017-10-02 08:46:45 -(
perf)[
INFO]
: get Elapsed: 0.04 ms
The punchline is: when the database changes, all connected app instances get notified, allowing them to update their caches.
The elaborate introduction in the preceding example was necessary to explain what we’re
trying to achieve. With that out of the way, we can now look at the
asyncpg
code implementation required to make our cache invalidation
actually work.
The basic design for the code is the following:
A simple web API using the new, asyncio
-compatible
Sanic web framework.
The data will be stored in a backend PostgreSQL instance, but the API will be served via multiple instances of the web API app servers.
The app servers will cache data from the database.
The app servers will subscribe to events, via asyncpg
in specific tables
on the DB, and will receive update notifications when the data in the DB
table has been changed. This allows the app servers to update their
individual in-memory caches.
# sanic_demo.py
import
argparse
from
sanic
import
Sanic
from
sanic
.
views
import
HTTPMethodView
from
sanic
.
response
import
json
from
util
import
Database
from
perf
import
aelapsed
,
aprofiler
import
model
app
=
Sanic
(
)
@aelapsed
async
def
new_patron
(
request
)
:
data
=
request
.
json
id
=
await
model
.
add_patron
(
app
.
pool
,
data
)
return
json
(
dict
(
msg
=
'
ok
'
,
id
=
id
)
)
class
PatronAPI
(
HTTPMethodView
,
metaclass
=
aprofiler
)
:
async
def
get
(
self
,
request
,
id
)
:
data
=
await
model
.
get_patron
(
app
.
pool
,
id
)
return
json
(
data
)
async
def
put
(
self
,
request
,
id
)
:
data
=
request
.
json
ok
=
await
model
.
update_patron
(
app
.
pool
,
id
,
data
)
return
json
(
dict
(
msg
=
'
ok
'
if
ok
else
'
bad
'
)
)
async
def
delete
(
self
,
request
,
id
)
:
ok
=
await
model
.
delete_patron
(
app
.
pool
,
id
)
return
json
(
dict
(
msg
=
'
ok
'
if
ok
else
'
bad
'
)
)
@app
.
listener
(
'
before_server_start
'
)
async
def
db_connect
(
app
,
loop
)
:
app
.
db
=
Database
(
'
restaurant
'
,
owner
=
False
)
app
.
pool
=
await
app
.
db
.
connect
(
)
await
model
.
create_table_if_missing
(
app
.
pool
)
await
app
.
db
.
add_listener
(
'
chan_patron
'
,
model
.
db_event
)
@app
.
listener
(
'
after_server_stop
'
)
async
def
db_disconnect
(
app
,
loop
)
:
await
app
.
db
.
disconnect
(
)
if
__name__
==
"
__main__
"
:
parser
=
argparse
.
ArgumentParser
(
)
parser
.
add_argument
(
'
--port
'
,
type
=
int
,
default
=
8000
)
args
=
parser
.
parse_args
(
)
app
.
add_route
(
new_patron
,
'
/patron
'
,
methods
=
[
'
POST
'
]
)
app
.
add_route
(
PatronAPI
.
as_view
(
)
,
'
/patron/<id:int>
'
)
app
.
run
(
host
=
"
0.0.0.0
"
,
port
=
args
.
port
)
The Database
utility helper as described earlier. This will
provide the methods required to connect to the database.
Two more tools I’ve cobbled together to log out the elapsed time
of each API endpoint. We used this in the previous discussion to detect
when a GET
was being returned from the cache. The implementations for
aelapsed()
and aprofiler()
are not important for this case study, but you
can obtain them in the appendix.
The main Sanic app instance is created.
This coroutine function is for creating new patron entries. In an
add_route()
call towards the bottom of the code,
new_patron()
is associated with the endpoint /patron
, and only for the
POST
HTTP method. The @aelapsed
decorator is not part of the Sanic
API: it’s my own invention, merely to log out timings for each call.
Sanic provides immediate deserialization of received JSON data
using the .json
attribute on the request
object.
The model
module, which we imported above, is the model for our
patron table in the database. We’ll go through that in more detail
in the next code listing. But for now, just understand that all the
database queries and SQL are in this model
module. Here we are
passing the connection pool for the database, and this is the same pattern
for all the interaction with the database model in this function and
in the PatronAPI
class further down.
A new primary key, id
, will be created, and this is returned back
to the caller as JSON.
While creation is handled in the new_patron()
function, all
other interactions are handled in this class-based view, which is
a convenience provided by Sanic. All the methods in this class are
associated with the same URL, /patron/<id:int>, which you can see
further below in the add_route()
near the bottom. Note that the
id
URL parameter will be passed to each of the methods, and this
parameter is required for all three endpoints.
You can safely ignore the metaclass
argument: all it does is wrap
each method with the @aelapsed
decorator so that timings will be
printed in the logs. This is not part of the Sanic API, and is
again my own invention for logging timing data.
As before, model interaction is performed inside the model
module.
If the model reports failure for doing the update, we modify the response data. I’ve included this for readers who have not yet seen Python’s version of the “ternary operator.”
The @app.listener
decorators are hooks provided by Sanic
to give you a place to add extra actions during the startup and
shutdown sequence. This one, before_server_start
, is invoked before
the API server will be started up. This seems like a good place to
initialize our database connection.
We use our Database
helper to create a connection to our
PostgreSQL instance. The DB we’re connecting to is restaurant
.
Obtain a connection pool to our database.
Use our model (for the patron
table) to create the table if
missing.
Use our model to create a dedicated listener for database
events, and we’re listening on the channel chan_patron
. The
callback function for these events is model.db_event()
, which we’ll
go through in the next listing. The callback will be called every
time the database updates the channel.
after_server_stop
is the hook for tasks that must happen
during shutdown. Here we disconnect from our database.
This add_route()
sends POST
requests for the /patron
URL
to the new_patron()
coroutine function.
This add_route()
sends all requests for the /patron/<id:int> URL
to the PatronAPI
class-based view. The method names in that class
determine which one is called. So a GET
HTTP request will call
the PatronAPI.get()
method, and so on.
The code above contains all the HTTP handling for our server,
as well as startup and shutdown tasks like setting up a connection
pool to the database, and also, crucially, setting up a db-event listener
on the chan_patron
channel on the DB server.
Now we’ll go through our model for the patron
table in the database:
# model.py
import
logging
from
json
import
loads
,
dumps
from
triggers
import
(
create_notify_trigger
,
add_table_triggers
)
from
boltons
.
cacheutils
import
LRU
logger
=
logging
.
getLogger
(
'
perf
'
)
CREATE_TABLE
=
(
'
CREATE TABLE IF NOT EXISTS patron(
'
'
id serial PRIMARY KEY, name text,
'
'
fav_dish text)
'
)
INSERT
=
(
'
INSERT INTO patron(name, fav_dish)
'
'
VALUES ($1, $2) RETURNING id
'
)
SELECT
=
'
SELECT * FROM patron WHERE id = $1
'
UPDATE
=
'
UPDATE patron SET name=$1, fav_dish=$2 WHERE id=$3
'
DELETE
=
'
DELETE FROM patron WHERE id=$1
'
EXISTS
=
"
SELECT to_regclass(
'
patron
'
)
"
CACHE
=
LRU
(
max_size
=
65536
)
async
def
add_patron
(
conn
,
data
:
dict
)
-
>
int
:
return
await
conn
.
fetchval
(
INSERT
,
data
[
'
name
'
]
,
data
[
'
fav_dish
'
]
)
async
def
update_patron
(
conn
,
id
:
int
,
data
:
dict
)
-
>
bool
:
result
=
await
conn
.
execute
(
UPDATE
,
data
[
'
name
'
]
,
data
[
'
fav_dish
'
]
,
id
)
return
result
==
'
UPDATE 1
'
async
def
delete_patron
(
conn
,
id
:
int
)
:
result
=
await
conn
.
execute
(
DELETE
,
id
)
return
result
==
'
DELETE 1
'
async
def
get_patron
(
conn
,
id
:
int
)
-
>
dict
:
if
id
not
in
CACHE
:
logger
.
info
(
f
'
id=
{id}
Cache miss
'
)
record
=
await
conn
.
fetchrow
(
SELECT
,
id
)
CACHE
[
id
]
=
record
and
dict
(
record
.
items
(
)
)
return
CACHE
[
id
]
def
db_event
(
conn
,
pid
,
channel
,
payload
)
:
event
=
loads
(
payload
)
logger
.
info
(
'
Got DB event:
'
+
dumps
(
event
,
indent
=
4
)
)
id
=
event
[
'
id
'
]
if
event
[
'
type
'
]
==
'
INSERT
'
:
CACHE
[
id
]
=
event
[
'
data
'
]
elif
event
[
'
type
'
]
==
'
UPDATE
'
:
CACHE
[
id
]
=
event
[
'
data
'
]
[
'
new
'
]
elif
event
[
'
type
'
]
==
'
DELETE
'
:
CACHE
[
id
]
=
None
async
def
create_table_if_missing
(
conn
)
:
if
not
await
conn
.
fetchval
(
EXISTS
)
:
await
conn
.
fetchval
(
CREATE_TABLE
)
await
create_notify_trigger
(
conn
,
channel
=
'
chan_patron
'
)
await
add_table_triggers
(
conn
,
table
=
'
patron
'
)
You have to add triggers to the database to be able to get
notifications when data changes. I’ve created these handy helpers
to create the trigger function itself (with create_notify_trigger
),
and also to add the trigger to a specific table (with add_table_triggers
).
The SQL required to do this is somewhat out of scope for this book,
but it’s still crucial to understanding how this case study works. I’ve
included the annotated code for these triggers
in the appendix.
The third-party boltons
package provides a bunch of useful tools,
not least of which is the LRU
cache, a more versatile option than the
@lru_cache
decorator in the functools
standard library module.6
This block of text holds all the SQL for the standard
CRUD operations. Note that we’re using native
PostgreSQL syntax for the parameters: $1
, $2
, and so on. There is
nothing novel here, and it won’t be discussed further.
Create the cache for this app instance.
This add_patron()
coroutine function is what we called from the
Sanic module inside the new_patron()
endpoint for adding new patrons.
Inside the function, we use the fetchval()
method to insert new data.
Why “fetchval” and not “execute”? Because “fetchval” returns
the primary key of the new inserted record!7
Update an existing record. When this succeeds, PostgreSQL will
return UPDATE 1
, so we use that as a check to verify that the update
succeeded.
Deletion is very similar to updating.
This is the “read” operation. This is the only part of our CRUD interface that cares about the cache. Think about that for a second: we don’t update the cache when doing insert, update, or delete. This is because we rely on the async notification from the database (via the installed triggers) to update the cache if any data is changed.
Of course, we do still want to use the cache after the first GET.
The db_event()
function is the callback that asyncpg
will make
when there are events on our DB notification channel, chan_patron
.
This specific parameter list is required by asyncpg
. conn
is the connection
on which the event was sent, pid
is the process id of the PostgreSQL
instance that sent the event, channel
is the name of the channel
(and in this case will be chan_patron
), and the payload is the
data being sent on channel.
Deserialize the JSON data to a dict
.
The cache population is generally quite straightforward but note that “update” events contain both new and old data, so we need to make sure we cache the new data only.
This is a small utility function I’ve made to easily recreate
a table if missing. This is really useful if you need to do this
frequently—such as writing the code samples for this book!
This is also where the database notification triggers are created
and added to our patron
table. See the appendix
for annotated listing of these functions.
That brings us to the end of this case study. We’ve seen how Sanic
makes it very simple to create an API server, and we’ve seen how
to use asyncpg
for performing queries via a connection pool, as
well as using PostgreSQL’s async notification features to receive
callbacks over a dedicated, long-lived database connection.
Many people prefer to use object-relational mappers (ORMs) to work
with databases, and in this area, SQLAlchemy
is the leader. There is growing support for using SQLAlchemy together with asyncpg
in third-party libraries like asyncpgsa
and GINO. Another popular ORM,
peewee, is given support for
asyncio
through the aiopeewee
package.
There are many other libraries for asyncio
not covered in this book. To
find out more, you can check out the
aio-libs project which manages nearly forty
different libraries, and also the
Awesome asyncio project
which bookmarks a large number of other projects for asyncio
.
One of the libraries from the links above bears special mention:
aiofiles
. If you recall from our
earlier discussions, we said that to achieve high concurrency in
asyncio
, it is vitally important that the loop never “block.” In this
context, our focus on blocking operations has been exclusively
network-based I/O, but it turns out that disk access is also a blocking
operation that will impact your performance at very high concurrency
levels. The solution to this is aiofiles
, which provides a convenient
wrapper for performing disk access in a thread. This works because
Python releases the GIL8 during file
operations so your main thread (running the asyncio
loop) is
unaffected.
The most important domain for asyncio
is going to be network programming.
For this reason it’s not a bad idea to learn a little about socket
programming, and even after all these years, Gordon McMillan’s
Socket Programming HOWTO,
included with the standard Python documentation is one of the best
introductions you’ll find.
I learned asyncio
from a very wide variety of sources, many of which
have already been mentioned in earlier sections. People learn
differently from different sources, and of my learning materials not
yet mentioned, these were very useful:
By far the best YouTube talk on asyncio I came across was Robert
Smallshire’s Getting To Grips With
Asyncio presented at NDC London in January 2017. The talk may be
somewhat advanced for a beginner, but it really does give a clear
description of how asyncio
is designed.
Nikolay Novik’s slides presented at PyCon UA 2016: Building Apps With Asyncio. The information is dense, but there is a lot of practical experience captured in these slides.
Endless sessions in the Python REPL, trying things out and “seeing what happens!”
Finally, I encourage you to continue learning, and if a concept doesn’t “stick,” keep looking for new sources until you find an explanation that works for you.
1 https://en.wikipedia.org/wiki/Message_queuing_service
2 https://glyph.twistedmatrix.com/2014/05/the-report-of-our-death.html
3 http://zeromq.org/whitepapers:brokerless
4 Actually, you can as long as the sockets being used in different threads are created, used, and destroyed entirely in their own threads. It is possible but hard to do, and many people struggle to get this right. This is why the recommendation to use a single thread and a polling mechanism is so strong.
5 The recipe for this dish, and recipes for other fine SPAM-based fare, can be found here.
6 Obtain boltons
with pip install boltons
.
7 You also need the RETURNING id
part of the SQL though!
8 global interpreter lock
18.116.37.62