Chapter 4: Event Sourcing in a Pinch

by Christopher Pitt

Common Language

A Safety Net

Storing State vs. Storing Behavior

Storing State

$product = new Product();
$product->title = "Chocolate";
$product->cents_per_serving = 499;
$product->save();

$outlet = new Outlet();
$outlet->location = "Pismo Beach";
$outlet->save();

$outlet->products()->sync([
    $product->id => [
        "servings_in_stock" => 24,
    ],
])
  • When did we start selling "Chocolate"? Many Object Relation Mappers (ORM) libraries will add fields like created_at and updated_at, but those only go so far in telling us what we want to know.
  • How did we get that much stock? Did we get a delivery? Did we give some away?
  • What happens to our analytics when we no longer want to sell "Chocolate", or when we want to move all stock to another outlet? Do we add a boolean field (to the products' table), to indicate that the product is no longer sold, but should remain in the analytics? Or perhaps we should add a timestamp, so we know when that all happened...

Storing Behavior

$events = [];

$events[] = new ProductInvented("Chocolate");
$events[] = new ProductPriced("Chocolate", 499);
$events[] = new OutletOpened("Pismo Beach");
$events[] = new OutletStocked("Pismo Beach", 24, "Chocolate");

store($events);
$events = [];

$events[] = new OutletStockGivenAway(
    "Pismo Beach", 2, "Chocolate"
);

$events[] = new OutletDiscontinuedProduct(
    "Pismo Beach", "Chocolate"
);

store($events);
$lastWeek = Product::at("Chocolate", date("-1 WEEK"));
$yesterday = Product::at("Chocolate", date("-1 DAY"));

printf(
    "Chocolate increased, from %s to %s, in one week",
    $lastWeek->cents_per_serving,
    $yesterday->cents_per_serving
);

So Which Is It?

Making Events

Avoiding Jargon

abstract class Event
{
    /**
     * @var DateTimeImmutable
     */
    private $date;

    protected function __construct()
    {
        $this->date = date("Y-m-d H:i:s");
    }

    public function date(): string
    {
        return $this->date;
    }

    abstract public function payload(): array;
}
final class ProductInvented extends Event
{
    /**
     * @var string
     */
    private $name;

    public function __construct(string $name)
    {
        parent::__construct();

        $this->name = $name;
    }

    public function payload(): array
    {
        return [
            "name" => $this->name,
            "date" => $this->date(),
        ];
    }
}
final class ProductPriced extends Event
{
    /**
     * @var string
     */
    private $product;

    /**
     * @var int
     */
    private $cents;

    public function __construct(string $product, int $cents)
    {
        parent::__construct();

        $this->product = $product;
        $this->cents = $cents;
    }

    public function payload(): array
    {
        return [
            "product" => $this->product,
            "cents" => $this->cents,
            "date" => $this->date(),
        ];
    }
}
final class OutletOpened extends Event
{
    /**
     * @var string
     */
    private $name;

    public function __construct(string $name)
    {
        parent::__construct();

        $this->name = $name;
    }

    public function payload(): array
    {
        return [
            "name" => $this->name,
            "date" => $this->date(),
        ];
    }
}
final class OutletStocked extends Event
{
    /**
     * @var string
     */
    private $outlet;

    /**
     * @var int
     */
    private $servings;

    /**
     * @var string
     */
    private $product;

    public function __construct(string $outlet, ↩
        int $servings, string $product)
    {
        parent::__construct();

        $this->outlet = $outlet;
        $this->servings = $servings;
        $this->product = $product;
    }

    public function payload(): array
    {
        return [
            "outlet" => $this->outlet,
            "servings" => $this->servings,
            "product" => $this->product,
            "date" => $this->date(),
        ];
    }
}

Omitting These Classes

Storing Events

Using PDO

$connection = new PDO("sqlite::memory:");

$connection->setAttribute(
    PDO::ATTR_ERRMODE,
    PDO::ERRMODE_EXCEPTION
);

SQL Ahead

$statement = $connection->prepare("
    CREATE TABLE IF NOT EXISTS product (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT
    )
");

$statement->execute();
$statement = $connection->prepare(
    "INSERT INTO product (name) VALUES (:name)"
);

$statement->bindValue("name", "Chocolate");
$statement->execute();
$row = $connection
    ->prepare("SELECT * FROM product")
    ->execute()->fetch(PDO::FETCH_ASSOC);

$rows = $connection
    ->prepare("SELECT * FROM product")
    ->execute()->fetchAll(PDO::FETCH_ASSOC);

Adding Helper Functions

function connect(string $dsn): PDO
{
    $connection = new PDO($dsn);

    $connection->setAttribute(
        PDO::ATTR_ERRMODE,
        PDO::ERRMODE_EXCEPTION
    );

    return $connection;
}

function execute(PDO $connection, string $query, ↩
    array $bindings = []): array
{
    $statement = $connection->prepare($query);

    foreach ($bindings as $key => $value) {
        $statement->bindValue($key, $value);
    }

    $result = $statement->execute();

    return [$statement, $result];
}

function rows(PDO $connection, string $query, ↩
    array $bindings = []): array
{
    $executed = execute($connection, $query, $bindings);

    /** @var PDOStatement $statement */
    $statement = $executed[0];

    return $statement->fetchAll(PDO::FETCH_ASSOC);
}

function row(PDO $connection, string $query, ↩
    array $bindings = []): array
{
    $executed = execute($connection, $query, $bindings);

    /** @var PDOStatement $statement */
    $statement = $executed[0];

    return $statement->fetch(PDO::FETCH_ASSOC);
}
$connection = connect("sqlite::memory:");

execute(
    $connection,
    "CREATE TABLE IF NOT EXISTS product ↩
        (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT)"
);

execute(
    $connection,
    "INSERT INTO product (name) VALUES (:name)",
    ["name" => "Chocolate"]
);

$rows = rows(
    $connection,
    "SELECT * FROM product"
);

$row = row(
    $connection,
    "SELECT * FROM product WHERE name = :name",
    ["name" => "Chocolate"]
);
$fake = new class("sqlite::memory:") extends PDO
{
    private $valid = true;

    function prepare($statement, $options = null) {
        if ($statement !== "SELECT * FROM product") {
            $this->valid = false;
        }

        return $this;
    }

    function execute() {
        return;
    }

    function fetchAll() {
        if (!$this->valid) {
            throw new Exception();
        }

        return [];
    }
};

assert(connect("sqlite::memory:") instanceof PDO);
assert(is_array(rows($fake, "SELECT * FROM product")));

Storing Events

$events = [];

$events[] = new ProductInvented("Chocolate");
$events[] = new ProductPriced("Chocolate", 499);
$events[] = new OutletOpened("Pismo Beach");
$events[] = new OutletStocked("Pismo Beach", 24, "Chocolate");
execute($connection, "
    CREATE TABLE IF NOT EXISTS product (
        id INTEGER PRIMARY KEY AUTOINCREMENT
    )
");

execute($connection, "
    CREATE TABLE IF NOT EXISTS event_product_invented (
        id INT,
        name TEXT,
        date TEXT
    )
");

execute($connection, "
    CREATE TABLE IF NOT EXISTS event_product_priced (
        product INT,
        cents INT,
        date TEXT
    )
");

execute($connection, "
    CREATE TABLE IF NOT EXISTS outlet (
        id INTEGER PRIMARY KEY AUTOINCREMENT
    )
");

execute($connection, "
    CREATE TABLE IF NOT EXISTS event_outlet_opened (
        id INT,
        name TEXT,
        date TEXT
    )
");

execute($connection, "
    CREATE TABLE IF NOT EXISTS event_outlet_stocked (
        outlet INT,
        servings INT,
        product INT,
        date TEXT
    )
");
function store(PDO $connection, array $events)
{
    foreach($events as $event) {
        storeOne($connection, $event);
    }
}

function storeOne(PDO $connection, Event $event)
{
    $payload = $event->payload();

    if ($event instanceof ProductInvented) {
        inventProduct(
            $connection,
            newProductId($connection),
            $payload["name"],
            $payload["date"]
        );
    }

    if ($event instanceof ProductPriced) {
        priceProduct(
            $connection,
            productIdFromName($connection, $payload["name"]),
            $payload["cents"],
            $payload["date"]
        );
    }

    if ($event instanceof OutletOpened) {
        openOutlet(
            $connection,
            newOutletId($connection),
            $payload["name"],
            $payload["date"]
        );
    }

    if ($event instanceof OutletStocked) {
        stockOutlet(
            $connection,
            outletIdFromName(
                $connection, $payload["outlet_id"]
            ),
            $payload["servings"],
            productIdFromName(
                $connection, $payload["product_id"]
            ),
            $payload["date"]
        );
    }
}
function newProductId(PDO $connection): int
{
    execute(
        $connection,
        "INSERT INTO product VALUES (null)"
    );

    return $connection->lastInsertId();
}

function inventProduct(PDO $connection, int $id, ↩
    string $name, string $date)
{
    execute(
        $connection,
        "INSERT INTO event_product_invented ↩
            (id, name, date) VALUES (:id, :name, :date)",
        ["id" => $id, "name" => $name, "date" => $date]
    );
}

function productIdFromName(PDO $connection, string $name): int
{
    $row = row(
        $connection,
        "SELECT * FROM event_product_invented ↩
            WHERE name = :name",
        ["name" => $name]
    );

    if (!$row) {
        throw new InvalidArgumentException("Product not found");
    }

    return $row["id"];
}

function priceProduct(PDO $connection, int $product, ↩
    int $cents, string $date)
{
    execute(
        $connection,
        "INSERT INTO event_product_priced ↩
            (product, cents, date) VALUES ↩
            (:product, :cents, :date)",
        ["product" => $product, "cents" => $cents, ↩
            "date" => $date]
    );
}

function newOutletId(PDO $connection): int
{
    execute(
        $connection,
        "INSERT INTO outlet VALUES (null)"
    );

    return $connection->lastInsertId();
}

function openOutlet(PDO $connection, int $id, ↩
    string $name, string $date)
{
    execute(
        $connection,
        "INSERT INTO event_outlet_opened (id, name, date) ↩
            VALUES (:id, :name, :date)",
        ["id" => $id, "name" => $name, "date" => $date]
    );
}

function outletIdFromName(PDO $connection, string $name): int
{
    $row = row(
        $connection,
        "SELECT * FROM event_outlet_opened ↩
            WHERE name = :name",
        ["name" => $name]
    );

    if (!$row) {
        throw new InvalidArgumentException("Outlet not found");
    }

    return $row["id"];
}

function stockOutlet(PDO $connection, int $outlet, ↩
    int $servings, int $product, string $date)
{
    execute(
        $connection,
        "INSERT INTO event_outlet_stocked ↩
            (outlet_id, servings, product_id, date) ↩
            VALUES (:outlet, :servings, :product, :date)",
        ["outlet" => $outlet, "servings" => $servings, ↩
            "product" => $product, "date" => $date]
    );
}

Naming Pattern

store($connection, [
    new ProductInvented("Cheesecake"),
]);

$row = row(
    $connection,
    "SELECT * FROM event_product_invented WHERE name = :name",
    ["name" => "Cheesecake"]
);

assert(!is_null($row));

Projecting Events

Product::at("Chocolate", date("-1 WEEK"));
// → ["id" => 1, "name" => "Chocolate", ...]
Product::latest();
// → [["id" => 1, "name" => "Chocolate", ...], ...]

Product::latest("Chocolate");
// → ["id" => 1, "name" => "Chocolate", ...]
function fetch(PDO $connection): array {
    $events = [];

    $tables = [
        ProductInvented::class => "event_product_invented",
        ProductPriced::class => "event_product_priced",
        OutletOpened::class => "event_outlet_opened",
        OutletStocked::class => "event_outlet_stocked",
    ];

    foreach ($tables as $type => $table) {
        $rows = rows($connection, "SELECT * FROM {$table}");

        $rows = array_map(
            function($row) use ($connection, $type) {
                return $type::from($connection, $row);
            }, $rows
        );

        $events = array_merge($events, $rows);
    }

    usort($events, function(Event $a, Event $b) {
        return strtotime($a->date()) - strtotime($b->date());
    });

    return $events;
}
  1. We define a list of event tables to get rows from.
  2. We fetch the rows for each type/table, and convert the resulting associative arrays to instances of the events.
  3. We use the date of each event, to sort them into chronological order.
abstract class Event
{
    // ...snip

    public function withDate(string $date): self
    {
        $new = clone $this;
        $new->date = $date;

        return $new;
    }

    abstract
    public
    static
    function
    from(PDO $connection, array $data);
}
final class ProductInvented extends Event
{
    // ...snip

    public static function from(PDO $connection, array $data)
    {
        $new = new static(
            $data["name"]
        );

        return $new->withDate($data["date"]);
    }
}
final class ProductPriced extends Event
{
    // ...snip

    public static function from(PDO $connection, array $data)
    {
        $new = new static(
            productNameFromId($connection, $data["product"]),
            $data["cents"]
        );

        return $new->withDate($data["date"]);
    }
}
final class OutletOpened extends Event
{
    // ...snip

    public static function from(PDO $connection, array $data)
    {
        $new = new static(
            $data["name"]
        );

        return $new->withDate($data["date"]);
    }
}
final class OutletStocked extends Event
{
    // ...snip

    public static function from(PDO $connection, array $data)
    {
        $new = new static(
            outletNameFromId($connection, $data["outlet"]),
            $data["servings"],
            productNameFromId($connection, $data["product"])
        );

        return $new->withDate($data["date"]);
    }
}
function productNameFromId(PDO $connection, int $id): string {
    $row = row(
        $connection,
        "SELECT * FROM event_product_invented WHERE id = :id",
        ["id" => $id]
    );

    if (!$row) {
        throw new InvalidArgumentException("Product not found");
    }

    return $row["name"];
}

function outletNameFromId(PDO $connection, int $id): string {
    $row = row(
        $connection,
        "SELECT * FROM event_outlet_opened WHERE id = :id",
        ["id" => $id]
    );

    if (!$row) {
        throw new InvalidArgumentException("Outlet not found");
    }

    return $row["name"];
}
$events = [];

$events[] = new ProductInvented("Chocolate");
$events[] = new ProductPriced("Chocolate", 499);
$events[] = new OutletOpened("Pismo Beach");
$events[] = new OutletStocked("Pismo Beach", 24, "Chocolate");

store($connection, $events); // ← events stored in database
$stored = fetch($connection); // ← events loaded from database

assert(json_encode($events) === json_encode($stored));
function project(PDO $connection, array $events): array {
    $entities = [
        "products" => [],
        "outlets" => [],
    ];

    foreach ($events as $event) {
        $entities = projectOne($connection, $entities, $event);
    }

    return $entities;
}

function projectOne(PDO $connection, array $entities, ↩
    Event $event): array
{
    if ($event instanceof ProductInvented) {
        $entities = projectProductInvented(
            $connection, $entities, $event
        );
    }

    if ($event instanceof ProductPriced) {
        $entities = projectProductPriced(
            $connection, $entities, $event
        );
    }

    if ($event instanceof OutletOpened) {
        $entities = projectOutletOpened(
            $connection, $entities, $event
        );
    }

    if ($event instanceof OutletStocked) {
        $entities = projectOutletStocked(
            $connection, $entities, $event
        );
    }

    return $entities;
}
function projectProductInvented(PDO $connection, ↩
    array $entities, ProductInvented $event): array
{
    $payload = $event->payload();

    $entities["products"][] = [
        "id" => productIdFromName($connection, $payload["name"]),
        "name" => $payload["name"],
    ];

    return $entities;
}

function projectProductPriced(PDO $connection, ↩
    array $entities, ProductPriced $event): array
{
    $payload = $event->payload();

    foreach ($entities["products"] as $i => $product) {
        if ($product["name"] === $payload["product"]) {
            $entities["products"][$i]["price"] = ↩
                $payload["cents"];
        }
    }

    return $entities;
}

function projectOutletOpened(PDO $connection, ↩
    array $entities, OutletOpened $event): array
{
    $payload = $event->payload();

    $entities["outlets"][] = [
        "id" => outletIdFromName($connection, $payload["name"]),
        "name" => $payload["name"],
        "stock" => [],
    ];

    return $entities;
}

function projectOutletStocked(PDO $connection, ↩
    array $entities, OutletStocked $event): array
{
    $payload = $event->payload();

    foreach ($entities["outlets"] as $i => $outlet) {
        if ($outlet["name"] === $payload["outlet"]) {
            foreach ($entities["products"] as $j => $product) {
                if ($product["name"] === $payload["product"]) {
                    $entities["outlets"][$i]["stock"][] = [
                        "product" => &$product,
                        "servings" => $payload["servings"],
                    ];
                }
            }
        }
    }

    return $entities;
}

Projecting Specifically

Summary

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.28.116