In this chapter we’ll start to make events more fundamental to the internal structure of our application. We’ll move from the current state in Figure 9-1 where events are an optional side-effect…
…to the situation in Figure 9-2 where everything goes via the message bus, and our app has been transformed fundamentally into a message-processor.
You can find our code for this chapter at github.com/cosmicpython/code/tree/chapter_09_all_messagebus.
git clone https://github.com/cosmicpython/code.git && cd code git checkout chapter_09_all_messagebus # or, if you want to code along, checkout the previous chapter: git checkout chapter_08_events_and_message_bus
Rich Hickey talks about “situated software”, meaning software that runs for extended periods of time, managing some real world process. Examples include warehouse-management systems, logistics schedulers, and payroll systems.
This software is tricky to write because unexpected things happen all the time in the real world of physical objects and unreliable humans. For example:
During a stock-take, we discover that three SPRINGY-MATTRESSes have been water damaged by a leaky roof.
A consignment of RELIABLE-FORKs is missing the required documentation and is held in customs for several weeks. Three RELIABLE-FORKs subsequently fail safety testing, and are destroyed.
A global shortage of sequins means we’re unable to manufacture our next batch of SPARKLY-BOOKCASE.
In all of these situations, we learn about the need to change batch quantities when they’re already in the system. Perhaps someone made a mistake on the number in the manifest, or perhaps some sofas fell off a truck. Following a conversation with the business,1, we model the situation as in Figure 9-3:
[ditaa, batch_changed_events_flow_diagram] +----------+ /---- +------------+ +--------------------+ | Batch |--> |RULE| --> | Deallocate | ----> | AllocationRequired | | Quantity | ----/ +------------+-+ +--------------------+-+ | Changed | | Deallocate | ----> | AllocationRequired | +----------+ +------------+-+ +--------------------+-+ | Deallocate | ----> | AllocationRequired | +------------+ +--------------------+
An event we’ll call batch quantity changed should lead us to change the
quantity on the batch, yes, but also to apply a business rule: if the new
quantity drops to less than the total already allocated, we need to
deallocate those orders from that batch. Then each one will require
a new allocation, which we can capture as an event called AllocationRequired
.
Perhaps you’re already anticipating that our internal messagebus and events can
help implement this requirement. We could define a service called
change_batch_quantity
that knows how to adjust batch quantities and also how
to deallocate any excess order lines, and then each deallocation can emit an
AllocationRequired
event which can be forwarded on to the existing allocate
service, in separate transactions. Once again, our message bus helps us to
enforce the single responsibility principle, and it allows us to make choices about
transactions and data integrity.
But before we jump in, think about where we’re headed. There are two kinds of flows through our system:
API calls that are handled by a service-layer function,
Internal events (which might be raised as a side-effect of a service-layer function) and their handlers (which in turn call service-layer functions).
Wouldn’t it be easier if everything was an event handler? If we rethink our API calls as capturing events, then the service-layer functions can be event handlers too, and we no longer need to make a distinction between internal and external event handlers:
services.allocate()
we could imagine as being the handler for an
AllocationRequired
event, and it can emit Allocated
events as its output.
services.add_batch()
could be the handler for a BatchCreated
event.2
Our new requirement will fit the same pattern:
An event called BatchQuantityChanged
can invoke a handler called
change_batch_quantity()
.
And the new AllocationRequired
events that it may raise can be passed on to
services.allocate()
too, so there is no conceptual difference between a
brand-new allocation coming from the API, and a reallocation that’s
internally triggered by a deallocation.
All sound like a bit much? Let’s work towards it all gradually. We’ll follow the Preparatory Refactoring workflow, AKA “make the change easy, then make the easy change”:
We’ll start by refactoring our service layer into event handlers. We can
get used to the idea of events being the way we describe inputs to the
system. In particular, the existing services.allocate()
function will
become the handler for an event called AllocationRequired
.
Then we’ll build an end-to-end test that puts BatchQuantityChanged
events
into the system, and looks for Allocated
events coming out.
And then our actual implementation will be conceptually very simple: a new
handler for BatchQuantityChanged
events, whose implementation will emit
AllocationRequired
events, which in turn will be handled by the exact same
handler for allocations that the API uses.
Along the way we’ll make a small tweak to the messagebus and UoW, moving the responsibility for putting new events onto the messagebus into the messagebus itself.
We start by defining the two events that capture our current API inputs:
AllocationRequired
and BatchCreated
:
BatchCreated and AllocationRequired events (src/allocation/domain/events.py)
@dataclass
class
BatchCreated
(
Event
):
ref
:
str
sku
:
str
qty
:
int
eta
:
Optional
[
date
]
=
None
...
@dataclass
class
AllocationRequired
(
Event
):
orderid
:
str
sku
:
str
qty
:
int
Then we rename services.py
to handlers.py
, we add in with the existing
message handler for send_out_of_stock_notification
, and most importantly,
we change all the handlers so that they have the same inputs: an event
and a UoW:
Handlers and services are the same thing (src/allocation/service_layer/handlers.py)
def
add_batch
(
event
:
events
.
BatchCreated
,
uow
:
unit_of_work
.
AbstractUnitOfWork
):
with
uow
:
product
=
uow
.
products
.
get
(
sku
=
event
.
sku
)
...
def
allocate
(
event
:
events
.
AllocationRequired
,
uow
:
unit_of_work
.
AbstractUnitOfWork
)
->
str
:
line
=
OrderLine
(
event
.
orderid
,
event
.
sku
,
event
.
qty
)
...
def
send_out_of_stock_notification
(
event
:
events
.
OutOfStock
,
uow
:
unit_of_work
.
AbstractUnitOfWork
,
):
.
send
(
'[email protected]'
,
f
'Out of stock for {event.sku}'
,
)
The change might be clearer as a diff:
Changing from services to handlers (src/allocation/service_layer/handlers.py)
def add_batch(- ref: str, sku: str, qty: int, eta: Optional[date],
- uow: unit_of_work.AbstractUnitOfWork
+ event: events.BatchCreated, uow: unit_of_work.AbstractUnitOfWork
): with uow:- product = uow.products.get(sku=sku)
+ product = uow.products.get(sku=event.sku)
... def allocate(- orderid: str, sku: str, qty: int,
- uow: unit_of_work.AbstractUnitOfWork
+ event: events.AllocationRequired, uow: unit_of_work.AbstractUnitOfWork
) -> str:- line = OrderLine(orderid, sku, qty)
+ line = OrderLine(event.orderid, event.sku, event.qty)
...+
+def send_out_of_stock_notification(
+ event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork,
+):
+ email.send(
...
Along the way, we’ve our service-layer’s API, which was a scattering of primitives, with some well-defined objects (see sidebar).
Our event handlers now need a UoW. In addition, as our message bus becomes more central to our application, it makes sense to put it explicitly in charge of collecting and processing new events. There was a bit of a circular dependency between UoW and message bus until now, so this will make it one-way:
Handle takes a UoW and manages a queue (src/allocation/service_layer/messagebus.py)
def
handle
(
event
:
events
.
Event
,
uow
:
unit_of_work
.
AbstractUnitOfWork
)
:
queue
=
[
event
]
while
queue
:
event
=
queue
.
pop
(
0
)
for
handler
in
HANDLERS
[
type
(
event
)
]
:
handler
(
event
,
uow
=
uow
)
queue
.
extend
(
uow
.
collect_new_events
(
)
)
The messagebus now gets passed the UoW each time it starts up.
When we begin handling our first event, we start a queue.
We pop events from the front of the queue and invoke its handlers.
The messagebus passes the UoW down to each handler
After each handler finishes, we collect any new events that have been generated, and we add them to the queue.
In unit_of_work.py, publish_events()
becomes a less active method,
collect_new_events()
:
UoW no longer puts events directly on the bus (src/allocation/service_layer/unit_of_work.py)
-from . import messagebus
-
class AbstractUnitOfWork(abc.ABC):
@@ -23,13 +21,11 @@ class AbstractUnitOfWork(abc.ABC):
def commit(self):
self._commit()
- self.publish_events()
- def publish_events(self):
+ def collect_new_events(self):
for product in self.products.seen:
while product.events:
- event = product.events.pop(0)
- messagebus.handle(event)
+ yield product.events.pop(0)
return bus
Handler Tests use Events (tests/unit/test_handlers.py)
class
TestAddBatch
:
def
test_for_new_product
(
self
):
uow
=
FakeUnitOfWork
()
messagebus
.
handle
(
events
.
BatchCreated
(
"b1"
,
"CRUNCHY-ARMCHAIR"
,
100
,
None
),
uow
)
assert
uow
.
products
.
get
(
"CRUNCHY-ARMCHAIR"
)
is
not
None
assert
uow
.
committed
...
class
TestAllocate
:
def
test_returns_allocation
(
self
):
uow
=
FakeUnitOfWork
()
messagebus
.
handle
(
events
.
BatchCreated
(
"b1"
,
"COMPLICATED-LAMP"
,
100
,
None
),
uow
)
result
=
messagebus
.
handle
(
events
.
AllocationRequired
(
"o1"
,
"COMPLICATED-LAMP"
,
10
),
uow
)
assert
result
==
"b1"
Our API and our service layer currently want to know the allocated batch ref
when they invoke our allocate()
handler. This means we need to put in
a temporary hack on our messagebus to let it return events.
Messagebus returns results (src/allocation/service_layer/messagebus.py)
def handle(event: events.Event, uow: unit_of_work.AbstractUnitOfWork):+ results = []
queue = [event] while queue: event = queue.pop(0) for handler in HANDLERS[type(event)]:- handler(event, uow=uow)
+ r = handler(event, uow=uow)
+ results.append(r)
queue.extend(uow.collect_new_events())+ return results
It’s because we’re mixing the read and write responsibilities in our system. We’ll come back to fix this wart in Chapter 12.
Flask changing to messagebus as a diff (src/allocation/entrypoints/flask_app.py)
@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
try:
- batchref = services.allocate(
- request.json['orderid'],
- request.json['sku'],
- request.json['qty'],
- unit_of_work.SqlAlchemyUnitOfWork(),
+ event = events.AllocationRequired(
+ request.json['orderid'], request.json['sku'], request.json['qty'],
)
+ results = messagebus.handle(event, unit_of_work.SqlAlchemyUnitOfWork())
+ batchref = results.pop(0)
except InvalidSku as e:
Instead of calling the service layer with a bunch of primitives extracted from the request JSON…
We instantiate an event
And pass it to the messagebus.
And we should be back to a fully functional application, but one that’s now fully event-driven.
What used to be service-layer functions are now event handlers…
…As are the functions we invoke for handling internal events raise by our domain model
We use events as our datastructure for capturing inputs to the system, as well as for handoff of internal work packages.
The entire app is now best described as a message processor (or event processor if you prefer. We’ll talk about the distinction in the next chapter.
We’re done with our refactoring phase. Let’s see if we really have “made the
change easy”. Let’s implement our new requirement: we’ll receive as our
inputs some new BatchQuantityChanged
events, pass them to a handler, which in
turn might emit some AllocationRequired
events, and those in turn will go
back to our existing handler for allocation, to be re-allocated.
[plantuml, reallocation_sequence_diagram, config=plantuml.cfg] @startuml API -> MessageBus : BatchQuantityChanged event group BatchQuantityChanged Handler + Unit of Work 1 MessageBus -> Domain_Model : change batch quantity Domain_Model -> MessageBus : emit AllocationRequired event(s) end group AllocationRequired Handler + Unit of Work 2 (or more) MessageBus -> Domain_Model : allocate end @enduml
Whenever you split things out like this across two units of work, you now have two database transactions, so you are opening yourself up to integrity issues: something could happen that means the first completes but the second does. You’ll need to think about whether this is acceptable, and whether you need to notice when it happens and do something about it. See the “Footguns” section in the Epilogue for more discussion.
The event that tells us a batch quantity has changed is very simple, it just needs a batch reference and a new quantity:
New event (src/allocation/domain/events.py)
@dataclass
class
BatchQuantityChanged
(
Event
):
ref
:
str
qty
:
int
Following the lessons learned in Chapter 4, we can operate in “high gear,” and write our unit tests at the highest possible level of abstraction, in terms of events. Here’s what they might look like:
Handler tests for change_batch_quantity (tests/unit/test_handlers.py)
class
TestChangeBatchQuantity
:
def
test_changes_available_quantity
(
self
)
:
uow
=
FakeUnitOfWork
(
)
messagebus
.
handle
(
events
.
BatchCreated
(
"
batch1
"
,
"
ADORABLE-SETTEE
"
,
100
,
None
)
,
uow
)
[
batch
]
=
uow
.
products
.
get
(
sku
=
"
ADORABLE-SETTEE
"
)
.
batches
assert
batch
.
available_quantity
==
100
messagebus
.
handle
(
events
.
BatchQuantityChanged
(
"
batch1
"
,
50
)
,
uow
)
assert
batch
.
available_quantity
==
50
def
test_reallocates_if_necessary
(
self
)
:
uow
=
FakeUnitOfWork
(
)
messagebus
.
handle
(
events
.
BatchCreated
(
"
batch1
"
,
"
INDIFFERENT-TABLE
"
,
50
,
None
)
,
uow
)
messagebus
.
handle
(
events
.
BatchCreated
(
"
batch2
"
,
"
INDIFFERENT-TABLE
"
,
50
,
date
.
today
(
)
)
,
uow
)
messagebus
.
handle
(
events
.
AllocationRequired
(
"
order1
"
,
"
INDIFFERENT-TABLE
"
,
20
)
,
uow
)
messagebus
.
handle
(
events
.
AllocationRequired
(
"
order2
"
,
"
INDIFFERENT-TABLE
"
,
20
)
,
uow
)
[
batch1
,
batch2
]
=
uow
.
products
.
get
(
sku
=
"
INDIFFERENT-TABLE
"
)
.
batches
assert
batch1
.
available_quantity
==
10
assert
batch2
.
available_quantity
==
50
messagebus
.
handle
(
events
.
BatchQuantityChanged
(
"
batch1
"
,
25
)
,
uow
)
# order1 or order2 will be deallocated, so we"ll have 25 - 20
assert
batch1
.
available_quantity
==
5
# and 20 will be reallocated to the next batch
assert
batch2
.
available_quantity
==
30
The simple case would be trivially easy to implement, we just modify a quantity.
But if we try and change the quantity so that there’s less than has been allocated, we’ll need to deallocate at least one order, and we expect to reallocated it to a new batch
Handler delegates to model layer (src/allocation/service_layer/handlers.py)
def
change_batch_quantity
(
event
:
events
.
BatchQuantityChanged
,
uow
:
unit_of_work
.
AbstractUnitOfWork
):
with
uow
:
product
=
uow
.
products
.
get_by_batchref
(
batchref
=
event
.
ref
)
product
.
change_batch_quantity
(
ref
=
event
.
ref
,
qty
=
event
.
qty
)
uow
.
commit
()
We realize we’ll need a new query type on our repository:
A new query type on our repository (src/allocation/adapters/repository.py)
class
AbstractRepository
(
abc
.
ABC
):
...
def
get
(
self
,
sku
)
->
model
.
Product
:
...
def
get_by_batchref
(
self
,
batchref
)
->
model
.
Product
:
product
=
self
.
_get_by_batchref
(
batchref
)
if
product
:
self
.
seen
.
add
(
product
)
return
product
@abc.abstractmethod
def
_add
(
self
,
product
:
model
.
Product
):
raise
NotImplementedError
@abc.abstractmethod
def
_get
(
self
,
sku
)
->
model
.
Product
:
raise
NotImplementedError
@abc.abstractmethod
def
_get_by_batchref
(
self
,
batchref
)
->
model
.
Product
:
raise
NotImplementedError
...
class
SqlAlchemyRepository
(
AbstractRepository
):
...
def
_get
(
self
,
sku
):
return
self
.
session
.
query
(
model
.
Product
)
.
filter_by
(
sku
=
sku
)
.
first
()
def
_get_by_batchref
(
self
,
batchref
):
return
self
.
session
.
query
(
model
.
Product
)
.
join
(
model
.
Batch
)
.
filter
(
orm
.
batches
.
c
.
reference
==
batchref
,
)
.
first
()
And on our FakeRepository
too:
Updating the fake repo too (tests/unit/test_handlers.py)
class
FakeRepository
(
repository
.
AbstractRepository
):
...
def
_get
(
self
,
sku
):
return
next
((
p
for
p
in
self
.
_products
if
p
.
sku
==
sku
),
None
)
def
_get_by_batchref
(
self
,
batchref
):
return
next
((
p
for
p
in
self
.
_products
for
b
in
p
.
batches
if
b
.
reference
==
batchref
),
None
)
We’re adding a query to our repository to make this use case easier to implement. So long as our query is returning a single aggregate, we’re not bending any rules. If you find yourself writing complex queries on your repositories, you might want to think about a different design. In particular, a method like get_most_popular_products, or find_products_by_order_id would definitely trigger our spidey sense. Chapter 11 and the Epilogue have some tips on managing complex queries.
We add the new method to the model, which does the quantity change and deallocation(s) inline, and publishes a new event. We also modify the existing allocate function to publish an event.
Our model evolves to capture the new requirement (src/allocation/domain/model.py)
class
Product
:
...
def
change_batch_quantity
(
self
,
ref
:
str
,
qty
:
int
):
batch
=
next
(
b
for
b
in
self
.
batches
if
b
.
reference
==
ref
)
batch
.
_purchased_quantity
=
qty
while
batch
.
available_quantity
<
0
:
line
=
batch
.
deallocate_one
()
self
.
events
.
append
(
events
.
AllocationRequired
(
line
.
orderid
,
line
.
sku
,
line
.
qty
)
)
...
class
Batch
:
...
def
deallocate_one
(
self
)
->
OrderLine
:
return
self
.
_allocations
.
pop
()
We wire up our new handler:
The messagebus grows (src/allocation/service_layer/messagebus.py)
HANDLERS
=
{
events
.
BatchCreated
:
[
handlers
.
add_batch
],
events
.
BatchQuantityChanged
:
[
handlers
.
change_batch_quantity
],
events
.
AllocationRequired
:
[
handlers
.
allocate
],
events
.
OutOfStock
:
[
handlers
.
send_out_of_stock_notification
],
}
# type: Dict[Type[events.Event], List[Callable]]
And our new requirement is fully implemented.
Events are simple dataclasses that define the data structures for inputs, and internal messages within our system. This is quite powerful from a DDD standpoint, since events often translate really well into business language (cf event storming).
Handlers are the way we react to events. They can call down to our model, or they can call out to external services. We can define multiple handlers for a single event if we want to. Handlers can also raise other events. This allows us to be very granular about what a handler does, and really stick to the SRP.
Our ongoing objective with these architectural patterns is to try and have the complexity of our application grow more slowly than its size. Here we’ve added quite a complicated use case (change quantity, deallocate, start new transaction, reallocate, publish external notification), but architecturally, there’s been no cost in terms of complexity. We’ve added new events, new handlers, and a new external adapter (for email), all of which are existing categories of things in our architecture that we understand and know how to reason about, and that are easy to explain to newcomers. Our moving parts each have one job, they’re connected to each other in well-defined ways, and there are no unexpected side-effects.
Now, you may be wondering, where are those BatchQuantityChanged
events actually
going to come from? The answer is coming up in a couple of chapters’ time. But
first, let’s talk about Events versus Commands.
Pros | Cons |
---|---|
|
|
1 Event-based modelling is so popular that a practice called event storming has been developed for facilitating event-based requirements gathering and domain model elaboration.
2 If you’ve done a bit of reading around event-driven architectures, you may be thinking “some of these events sound more like commands!”. Bear with us! We’re trying to introduce one concept at a time. In the next chapter we’ll introduce the distinction between command and events.
18.118.93.64