Every seasoned developer has some first-hand experience of how difficult it can be to deal with problems such as deadlocks and race conditions. These are the hard problems that can arise in concurrent programs that involve shared mutable state (“shared,” that is, between processes that execute concurrently).
This is why, throughout this book, you’ve seen many examples of how to solve problems without making recourse to shared mutable state. Indeed, my recommendation is to avoid shared mutable state whenever possible, and FP provides an excellent paradigm for doing so.
In this chapter, you’ll see why it’s not always possible to avoid shared mutable state, and what strategies there are to synchronize access to shared mutable state. We’ll then concentrate on one of these strategies: agent-based concurrency, a style of concurrent programming that relies on message-passing between agents that “own” some state that they access in a single-threaded way. Programming with agents is popular with F# programmers, but you’ll see that it’s perfectly doable in C#.
It’s generally possible to avoid shared mutable state when designing parallel algorithms. For instance, if you have a computationally intensive problem that you’d like to parallelize, you can usually break the data set or the tasks down in such a way that several threads compute an intermediate result independently. Hence, these threads can do their work without the need to share any state. Another thread may compute the final result by combining all the intermediate results.
The problem, however, is that avoiding shared mutable state isn’t always possible. Although it can generally be achieved in the case of parallel computations, it’s much more difficult if the source of concurrency is multithreading. For example, imagine a multithreaded application, such as a server handling requests on multiple threads, that needs to do the following:
Keep an application-wide counter so that unique, sequential account numbers can be generated.
Represent real-world entities like items for sale, trades, contracts, and so on, ensuring that you don’t sell the same (unique, real-world) item twice if two concurrent requests to buy it are received.
In such scenarios, it’s essentially a requirement to share mutable state between the many threads that the server application uses to more efficiently handle requests. To prevent concurrent access from leading to data inconsistencies, you need to ensure the state can’t be accessed (or, at least, updated) concurrently by different threads. That is, you need to synchronize access to shared mutable state.
In mainstream programming, this synchronization is usually achieved using locks. Locks define critical sections of the code that can only be entered by one thread at a time. When one thread enters a critical section, it blocks other threads from entering it. Functional programmers tend to avoid using locks and resort, instead, to alternative techniques:
Compare-and-swap (CAS)—CAS allows you to atomically read and update a single value, which can be done in C# using the Interlocked.CompareExchange
methods.
Software transactional memory (STM)—STM allows you to update mutable state within transactions, which offers some interesting guarantees about how these updates take place:
Message-passing concurrency—The idea of this approach is that you set up lightweight processes that have exclusive ownership of some mutable state. Communication between processes is via message-passing, and processes handle messages sequentially, hence preventing concurrent access to their state. There are two main embodiments of this approach:
CAS only allows you to deal with a single value, so it provides an effective solution for a very limited number of scenarios.
STM is an important paradigm for in-process concurrency, and it’s particularly popular among Clojure and Haskell developers because these languages come with a compelling and battle-tested implementation of STM. If you want to explore this paradigm in C#, language-ext contains an implementation of Atom
and Ref
, the primitives allowing you to atomically update data shared between threads.2
In the rest of this chapter, I’ll concentrate on message-passing concurrency, especially agent-based concurrency. You’ll later see how agents and actors differ in more detail. Let’s begin by looking at message-passing concurrency as a programming model.
You can think of an agent (or actor; the fundamental idea is the same) as a process that has exclusive ownership of some mutable state. Communication between actors is via message passing so that the state can never be accessed from outside of the actor. Furthermore, incoming messages are processed sequentially so that concurrent state updates can never take place.
Figure 19.1 illustrates an agent: a process that runs in a loop. It has an inbox in which messages are queued, and it has some state. When a message is dequeued and processed, the agent typically does some of the following:
The new state will be used as the current state at the next iteration, when the following message is processed.
Let’s begin with an idealized, almost pseudocode implementation of an agent as I just described. Look at the code in the following listing in detail and see how each part corresponds to what’s depicted in figure 19.1.
public sealed class Agent<State, Msg> { BlockingCollection<Msg> inbox ❶ = new BlockingCollection<Msg>(new ConcurrentQueue<Msg>()); public void Tell(Msg message) ❷ => inbox.Add(message); ❷ public Agent ❸ ( State initialState, Func<State, Msg, State> process ) { void Loop(State state) { Msg message = inbox.Take(); ❹ State newState = process(state, message); ❺ Loop(newState); ❻ } Task.Run(() => Loop(initialState)); ❼ } }
❶ Uses a concurrent queue as a message inbox
❷ Telling a message to the agent simply enqueues the message.
❸ Creates an agent by providing an initial state and a processing function
❹ Dequeues a message as soon as it’s available
❺ Processes the message, determining the new state of the agent
❼ The actor runs in its own process.
There are several interesting things to point out here. First, notice that there are only two public members, so only two interactions with an agent are allowed:
You can define more complex interactions from these primitive operations.
Let’s now look at the processing loop, encoded in the Loop
function. This dequeues the first message from the inbox (or waits until a message becomes available) and processes it using the agent’s processing function and its current state. This yields the agent’s new state, which is used in the next execution of the loop.
Notice that the implementation is side-effect free, apart from any side effects that may occur when calling the given processing function. The way changes in state are captured is by always passing the state as an argument to the Loop
function (a technique you already saw in chapter 15).
Notice also that this implementation assumes that State
must be an immutable type; otherwise, it could be shared by the process
function and updated arbitrarily outside of the scope of the agent’s processing loop. As a result, the state only “appears” to be mutable because a new version of the state is used with each invocation of Loop
.
Finally, take a moment to look at the signature for the constructor. Does it remind you of anything? Compare it with Enumerable.Aggregate
. Can you see that it’s essentially the same? The current state of an agent is the result of reducing all the messages it’s received so far, using the initial state as an accumulator value and the processing function as a reducer. It’s a fold in time over the stream of messages received by the agent.
This implementation is elegant, and it would work well in a language with tail-call elimination. This is not featured in C#, so we’ll need to make some changes for a stack-safe implementation. Furthermore, we can also dispense with many of the low-level details by using existing functionality in .NET. We’ll look at this next.
.NET includes an implementation of agents called MailboxProcessor
, but it was designed for use from F# and is awkward to use from C#. And, although the preceding implementation is useful for understanding the idea, it’s not optimal. Instead, in the coming examples, I’ll use a more practical implementation of an agent, which is included in LaYumba.Functional
and shown in the following listing.
using System.Threading.Tasks.Dataflow; public interface Agent<Msg> { void Tell(Msg message); } class StatefulAgent<State, Msg> : Agent<Msg> { private State state; private readonly ActionBlock<Msg> actionBlock; public StatefulAgent ( State initialState, Func<State, Msg, State> process ) { state = initialState; actionBlock = new ActionBlock<Msg>(msg => { var newState = process(state, msg); ❶ state = newState; ❷ }); } public void Tell(Msg message) => actionBlock.Post(message); ❸ }
❶ Processes the message with the current state
❷ Assigns the result to the stored state
❸ Queueing and processing messages is managed by the ActionBlock
.
Here I’ve replaced the recursive call (which could lead to stack overflow) with a single mutable variable state
that keeps track of the agent’s state and is reassigned as each message is processed. Although this is a side effect, messages are processed sequentially, therefore preventing concurrent writes.
I’ve also dispensed with the details of managing the queue and process by using ActionBlock
, one of the building blocks in .NET’s Dataflow
library. An ActionBlock
contains a buffer (by default, unbounded in size) that acts as the agent’s inbox and that only allows a fixed number of threads to enter the block (by default, a single thread), ensuring messages are processed sequentially.
State
should still be an immutable type (otherwise, as previously pointed out, it could be shared by the process
function and mutated outside the scope of the ActionBlock
). If this is observed, the code is thread-safe.
From the point of view of the client code, nothing has changed: we still only have two public members with the same signatures as before. The reason for the Agent<Msg>
interface is twofold:
From the point of view of the client code consuming an agent, you can only tell it messages, so by using the interface, we avoid exposing the type parameter for the state. After all, the type of the state is an implementation detail of the agent.
You can envisage other implementations such as stateless agents or agents that persist their state.
Finally, here are some convenience methods for easily creating agents:
public static class Agent { public static Agent<Msg> Start<State, Msg> ( State initialState , Func<State, Msg, State> process) => new StatefulAgent<State, Msg>(initialState, process); public static Agent<Msg> Start<Msg>(Action<Msg> action) => new StatelessAgent<Msg>(action); }
The first overload simply creates an agent with the given arguments. The second takes an action and is used to create a stateless agent: an agent that processes messages sequentially but doesn’t keep any state. (The implementation is trivial, as it just creates an ActionBlock
with the given Action
). We can also define agents with an asynchronous processing function/action; I’ve omitted the overloads for brevity, but the full implementation is in the code samples. Next, we’ll get started using agents.
Let’s look at some simple examples of using agents. We’ll build a couple of simple agents that interact as figure 19.2 shows.
We’ll start with a really simple, stateless agent that takes a message of type string
and just prints it out. You can follow along in the REPL:
Agent<string> logger = Agent.Start((string msg) => WriteLine(msg)); logger.Tell("Agent X"); // prints: Agent X
Next, let’s define the ping
and pong
agents that interact with the logger
and with each other:
Agent<string> ping, pong = null; ping = Agent.Start((string msg) => { if (msg == "STOP") return; logger.Tell($"Received '{msg}'; Sending 'PING'"); Task.Delay(500).Wait(); pong.Tell("PING"); }); pong = Agent.Start(0, (int count, string msg) => { int newCount = count + 1; string nextMsg = (newCount < 5) ? "PONG" : "STOP"; logger.Tell($"Received '{msg}' #{newCount}; Sending '{nextMsg}'"); Task.Delay(500).Wait(); ping.Tell(nextMsg); return newCount; }); ping.Tell("START");
Here, we define two additional agents. ping
is stateless; it sends a message to the logger
agent and a PING message to the pong
agent, unless the message it’s told is STOP, in which case, it does nothing. It’s quite common for an agent to have different behavior depending on the message, that is, to interpret the message as a command.
Now let’s see a stateful agent: pong
. The implementation is quite similar to ping
. It sends PONG to ping
, but it also keeps a counter as state. The counter is incremented with every message, and after five messages, the agent sends a STOP message instead.
The whole ping-pong is set in motion when we send the initial START message to ping
on the last line. Running the program causes the following to be printed:
Received 'START'; Sending 'PING' Received 'PING' #1; Sending 'PONG' Received 'PONG'; Sending 'PING' Received 'PING' #2; Sending 'PONG' Received 'PONG'; Sending 'PING' Received 'PING' #3; Sending 'PONG' Received 'PONG'; Sending 'PING' Received 'PING' #4; Sending 'PONG' Received 'PONG'; Sending 'PING' Received 'PING' #5; Sending 'STOP'
Now that you’ve seen some simple agents interact, it’s time to move on to something closer to real-world requirements.
Let’s revisit the scenario of a service that provides exchange rates. The service should retrieve rates from the Rates API and cache them. We saw a simple implementation in section 15.1, but there the interaction was via the command line so that requests necessarily came in one by one.
Let’s change that. Let’s imagine that the service is part of a larger system and that other components may request rates via a message broker, as figure 19.3 illustrates.
Components communicate with each other by sending messages via the message broker. To communicate with the currency lookup service, the following messages are defined:
record FxRateRequest ( string CcyPair, ❶ string Sender ❷ ); record FxRateResponse ( string CcyPair, decimal Rate, string Recipient ❷ );
❶ The currency pair whose rate is being requested
❷ The sender and recipient fields allow the message broker to correctly route messages.
We’ll assume that the message broker is multithreaded, so that our service may receive multiple requests on different threads at exactly the same time.
In this case, sharing state between threads is a requirement: if we had a different cache for every thread, that would be suboptimal. So we need some synchronization to ensure that we don’t perform unnecessary remote lookups and that cache updates don’t cause race conditions.
Next, we’ll see how we can use agents to achieve this. First, we’ll need a bit of setup code, defining the interaction with the message broker. This is shown in listing 19.3. Note that the code isn’t specific to any particular message broker; we just need to be able to subscribe to it to receive requests and to use it to send responses. (The code samples include an implementation of MessageBroker
that uses Redis as its underlying transport.)
public static void SetUp(MessageBroker broker) { Agent<FxRateResponse> sendResponse = Agent.Start( (FxRateResponse res) => broker.Send(res.Recipient, res)); ❶ Agent<FxRateRequest> processRequest = StartReqProcessor(sendResponse); ❷ broker.Subscribe<FxRateRequest>("FxRates", processRequest.Tell); ❸ }
❶ An agent that sends the responses
❷ An agent that processes the requests and uses the previously defined agent to send the response
❸ When a request is received, passes it to the processing agent
Starting at the bottom, we subscribe to receive requests broadcast on the “FxRates” channel, providing a callback to handle the request. This callback (which will be called on multiple threads) simply passes the request to the processing agent, defined on the previous line. Hence, although requests are received on multiple threads, they’ll be immediately queued up in the processing agent’s inbox and processed sequentially.
Does this mean that processing is now single-threaded, and we lose any benefit of multithreading? Not necessarily! If the processing agent did all the processing, that would indeed be the case. Instead, let’s take a more granular approach: we can have an agent for each currency pair in charge of fetching and storing the rate for its particular pair. The request-processing agent will just be in charge of managing these per-currency-pair agents and delegating the work to them, as figure 19.4 shows.
Let’s now look at the definitions of the agents. The following listing shows the higher-level agent, which handles incoming requests and starts the lower-level per-currency-pair agents, delegating work to them.
using CcyAgents = System.Collections.Immutable .ImmutableDictionary<string, Agent<string>>; static Agent<FxRateRequest> StartReqProcessor (Agent<FxRateResponse> sendResponse) => Agent.Start(CcyAgents.Empty , (CcyAgents state, FxRateRequest request) => { string ccyPair = request.CcyPair; Agent<string> agent = state .Lookup(ccyPair) .GetOrElse(() => StartAgentFor(ccyPair, sendResponse)); ❶ agent.Tell(request.Sender); ❷ return state.Add(ccyPair, agent); });
❶ If required, starts a new agent for the requested currency pair
❷ Passes the request to the agent in charge of the pair
As you can see, the request-processing agent holds not a cache of values, but of agents—one for each currency pair. It starts those agents as needed and forwards the requests to them.
The benefit of this solution is that requests for one currency, say GBPUSD, won’t impact requests for another, say EURUSD. On the other hand, if you get several requests for GBPUSD at the same time, only one remote request is made to fetch that rate while other requests are queued.
Finally, the following listing provides the definition of the agent that manages the rate for a single currency pair.
static Agent<string> StartAgentFor ( string ccyPair, Agent<FxRateResponse> sendResponse ) => Agent.Start<Option<decimal>, string> ( initialState: None, process: async (optRate, recipient) => { decimal rate = await optRate.Map(Async) .GetOrElse(() => RatesApi.GetRateAsync(ccyPair)); ❶ sendResponse.Tell(new FxRateResponse ❷ ( CcyPair: ccyPair, Rate: rate, Recipient: recipient )); return Some(rate); ❸ } );
❶ If necessary, fetches the rate from the remote API
This agent’s state is the rate for a single pair; it’s wrapped in an Option
because, when the agent is first created, it has no rate available yet. Upon receiving a request, the agent decides whether a remote lookup is required (you could easily improve this to fetch the rate if the cached value is expired).
To keep the example simple, I’ve avoided the question of expiry as well as error handling. I’m also assuming that sending the request to the message broker is a fire-and-forget operation with minimal latency so that it’s OK to have a single agent performing it.
The main point of the example is that using agents with their sequential processing of messages can be quite efficient. It does, however, require a mental shift, both from the functional approach we’ve been pursuing in this book and from the traditional approach of using locks.
Agents and actors are closely related. In both cases, a single thread processes messages sequentially and communicates with other actors/agents by sending them messages. There are also important differences:
Agents run within a single process, whereas actors are designed for distributed systems. In the examples we’ve looked at so far, a reference to an agent refers to a specific instance in the current process. References to actors, on the other hand, are location-transparent; when you have a reference to an actor, that actor may be running in the same process or in another process, possibly on a remote machine. A reference to an agent is a pointer, whereas a reference to an actor is an ID that the actor model implementation uses to route messages across processes as appropriate.
Actor model implementations are designed to be fault-tolerant. For example, Erlang includes supervisors: actors that monitor supervised actors and take action when they fail. Regular actors handle the happy path, whereas supervisors take care of recovery, ultimately improving the robustness of the system. There’s no counterpart to this with agents.
The state of an agent (or an actor) should be immutable and never shared outside the scope of the agent. In our agent implementation, however, there’s nothing stopping an inexperienced developer from creating an agent whose state is mutable and from passing that mutable state to other components, thus allowing that state to be changed from outside the scope of the agent. With actors, messages are serialized, so this should never occur.
As you can see, although the fundamental idea behind agents and actors is the same, the actor model is richer and more complex. You should only consider using the actor model if you require coordination of concurrent operations across different applications or machines; otherwise, the operational and setup cost would be unjustified, and you should rely on agents instead.
Although I was able to implement an actor with just a few lines of code, implementing the actor model is much more complex. So if you want to use actors, you’ll probably use one of several implementations of the actor model for .NET:
Orleans (https://github.com/dotnet/orleans) is Microsoft’s take on the actor model. It has a distinctly object-oriented feel. The underlying philosophy is that less experienced developers can interact with actors (called grains) as though they were local objects, without being exposed to any additional complexities specific to the actor model. Orleans takes care of managing the grains’ lifecycle, meaning that their state is kept in memory or persisted to storage automatically. Persistence can be to a variety of media including SQL Server and cloud storage on Azure.
Akka.NET (http://getakka.net/) is a community-driven port of the Akka framework popular with Scala developers. It predates Orleans and is much more explicit about its message-driven nature, so the barrier to entry is higher. A variety of options are available for message transport and persistence of the actors’ state.
echo (https://github.com/louthy/echo-process) is the .NET implementation closest to Erlang, and was developed by Paul Louth. It’s the most lightweight option, both in terms of syntax and configuration: you can create an actor (called a process) with a function like we did with agents, or you can use an interface-based approach (which reads more naturally if you need to handle different kinds of messages). Out of the box, echo only supports messaging across application domains and persistence via Redis, but you can implement adapters to target different infrastructure.
All these actor model implementations differ both in terminology and in important technical details, so it’s difficult to offer a description of the actor model without being somewhat specific to one implementation. This is one of the reasons why I’ve opted to illustrate the fundamental ideas of message-passing concurrency with a simple implementation of agents instead. You can carry these principles over to actor-based programming, but you’ll need to learn other principles such as error handling with supervisors and the guarantees for message delivery offered by the specific implementation you’re using.
Is agent-based programming even functional? Although agents and actors were developed in the context of functional languages (remember, in our idealized implementation an agent was side-effect free), agent-based programming differs starkly from the functional techniques you’ve seen throughout this book:
You tell an agent a message, and this is usually interpreted as a command, so the semantics are rather imperative.
An agent often performs a side effect or tells a message to another agent, which will, in turn, perform a side effect.
Most importantly, telling an agent a message returns no data, so you can’t compose tell operations into pipelines the way you can with functions.
FP separates logic from data; agents contain data and at least some logic in the processing function.
As a result, agent-based programming “feels” different from FP as you’ve seen it so far, so it’s debatable whether or not agent-based concurrency is actually a functional technique. If you think it’s not (as I’m inclined to), then you must conclude that FP is not good at certain types of concurrency (where shared mutable state can’t be avoided), and it needs to be complemented with a different paradigm such as agent-based programming or the actor model.
With agents, it’s easy to program unidirectional data flows: the data always flows forward (to the next agent), and no data is ever returned. In the face of this, we have two choices:
Embrace the idea of unidirectional data flow, and write your applications in this style. In this approach, if you had clients connecting to a server, you wouldn’t use a request-response model like HTTP but rather a message-based protocol such as WebSockets or a message broker. This is a viable approach, especially if your domain is event-driven enough that you already have a messaging infrastructure in place.
Hide the agent-specific details behind a more conventional API. This implies that agents should be able to return a response to a message sender. In this approach, agents are used as concurrency primitives that are implementation details (just as locks are) and that should not dictate the program’s design. We’ll explore this approach next.
The first thing we need is a way to get a reply from an agent, a “return value” of sorts. Imagine that a sender creates a message that includes a handle that it can wait on. It then tells the message to an agent, which signals a result on that handle, making it available to the sender. With this, we can effectively have two-way communication on top of the fire-and-forget Tell
protocol.
TaskCompletionSource
provides a suitable handle for this purpose: the sender can create a TaskCompletionSource
, add it to the message payload, and await its Task
. The agent will do its work and set the result on the TaskCompletionSource
when ready. Doing this manually for every message for which you want a response would be tedious, so instead, I’ve included in my LaYumba.Functional
library a beefed-up agent that takes care of all this wiring. I won’t include the implementation details here, but the interface definition is as follows:
Notice that this is a completely new interface with not one but two generic arguments: the type of messages that the agent accepts and the type that it replies with. Telling a message of type Msg
to this agent returns a Task<Reply>
. To start an agent of this type, we’ll use a processing function of type
which is a function that, given the agent’s current state and the received message, computes not only the agent’s new state, but also a reply to be returned to the sender.
Let’s look at a simple example—an agent that keeps a counter and can be told to increment the counter, and also returns the counter’s new value:
var counter = Agent.Start(0 , (int state, int msg) => { var newState = state + msg; return (newState, newState); ❶ });
❶ Returns the new state to be stored and the reply to the sender
You can now consume this agent like this:
var newCount = await counter.Tell(1); newCount // => 1 newCount = await counter.Tell(1); newCount // => 2
Notice that Tell
returns a Task<int>
, so the caller can just await the reply, as with any asynchronous function. Essentially, you can use this agent as a thread-safe, stateful, asynchronous version of a function of type Msg
→
Reply
:
Thread safe because it internally uses an ActionBlock
that processes one message at a time
Stateful because the state kept by the agent can change as a result of processing a message
Asynchronous because your message may have to wait while the agent processes other messages in its queue
This means that, compared to using locks, you’re not only gaining in safety (no deadlocks) but also in performance (locks block the current thread, whereas await
frees the thread to do other work).
Now that we have a mechanism for two-way communication in place, we can improve the API by hiding the specifics of agent-based programming. For example, in the case of a counter, we could define a Counter
class as the following listing shows.
public sealed class Counter { readonly Agent<int, int> counter = ❶ Agent.Start(0, (int state, int msg) => { var newState = state + msg; return (newState, newState); }); public Task<int> IncrementBy(int amount) ❷ => counter.Tell(amount); }
❶ The agent is an implementation detail.
❷ Public interface of the Counter
Now a consumer of Counter
can be blissfully unaware of its agent-based implementation. A typical interaction would look like this:
In LOB applications, the need to synchronize access to some shared state usually arises from the fact that entities in the application represent real-world entities, and we need to ensure that concurrent access doesn’t leave them in an invalid state or otherwise break business rules. For example, two concurrent requests to purchase a particular item shouldn’t result in that item being sold twice. Similarly, concurrent moves in a multiplayer game shouldn’t lead to an invalid state of the game.
Let’s see how this would play out in our banking scenario. We need to ensure that when different transactions (debits, credits, transfers) happen concurrently, they don’t leave the account in an invalid state. Does this mean we need to synchronize access to the account data? Not necessarily! Let’s see what happens if we don’t take any special measures with respect to concurrency.
Imagine an account with a balance of 1,000. An automated direct debit payment occurs causing 800 to be debited from the account. Concurrently, a transfer of 200 is requested so that the amount of 200 is also debited. If we use the event-sourced approach shown so far in this book, we get the following result:
The direct debit request causes the creation of an event, capturing a debit of 800, and the caller will receive an updated state with a balance of 200.
The transfer request likewise causes the creation of an event, capturing a debit of 200, and the caller will receive an updated state with a balance of 800.
When the account is loaded next, its state is computed from all past events so that its state will correctly have a balance of 0.
As new events are published, any clients that subscribe to updates can reflect those changes in state. (For example, the client device on which the transfer request was made can be notified when the direct debit has taken place so that the account balance shown to the user is always up to date.)
In short, if you use immutable objects and event sourcing, you don’t get inconsistent data as a result of concurrent updates; this is an important benefit of event sourcing.
Let’s now enrich this scenario with a new business requirement. Each account is assigned a maximum allowed overdraft, meaning that an account’s balance can never go below a certain amount. Now imagine that we have the following:
If you don’t synchronize access to the account data, both requests will succeed, leading to the account having an overdraft of 600, which violates our business requirement that the overdraft should never exceed 500. To enforce the maximum allowed overdraft, we need to synchronize the execution of actions that modify the account balance, as a result of which one of the concurrent requests in this scenario should fail. Next, you’ll see how to achieve this using actors.
To ensure that the account data can’t be affected concurrently by different requests, we can associate an agent with each account. Notice that agents are lightweight enough that it’s OK to have thousands or even millions of them. Also notice that I’m assuming there’s a single server process through which accounts can be affected. If this weren’t the case, you’d need to use an implementation of the actor model instead, but the gist of the following implementation would still be valid.
To associate an agent with an account, we’ll define an AccountProcess
class with an agent-based implementation. This means we’re now using three classes to represent accounts:
AccountState
—A record that represents the state of an account at a given moment in time
Account
—A static class that only contains pure functions used to calculate state transitions
AccountProcess
—An agent-based implementation that tracks the current state of an account and handles any commands that affect the account’s state
You saw implementations of Account
and AccountState
in chapter 13, and those don’t need to change. The following listing shows the implementation of AccountProcess
.
using Result = Validation<(Event Event, AccountState NewState)>; public class AccountProcess { Agent<Command, Result> agent; public AccountProcess ( AccountState initialState, Func<Event, Task<Unit>> saveAndPublish ) => this.agent = Agent.Start(initialState , async (AccountState state, Command cmd) => { Result result = cmd switch { MakeTransfer transfer => state.Debit(transfer), ❶ FreezeAccount freeze => state.Freeze(freeze), ❶ }; await result.Traverse(tpl => saveAndPublish(tpl.Event)); ❷ var newState = result .Map(tpl => tpl.NewState) .GetOrElse(state); return (newState, result); }); public Task<Result> Handle(Command cmd) => agent.Tell(cmd); ❸ }
❶ Uses pure functions to calculate the result of the command
❷ Persists the event within the block so that the agent doesn’t process new messages in a nonpersisted state
❸ All commands are queued and processed sequentially.
Each instance of AccountProcess
internally holds an agent, so that all commands affecting an account can be processed sequentially. Let’s look at the body of the agent: first, we calculate the result of the command, given the command and the current state. This is done using pure, static functions only.
The result, remember, is a Validation
with an inner value including the resulting Event
and the new account state. If the result is Valid
, we proceed to save and publish the created event (the check is done as part of Traverse
).
It’s important to note that persistence happens within the processing function. That is, the agent shouldn’t update its state and start processing new messages before it has successfully persisted the event representing its current state transition. (Otherwise, persisting the event could fail, leading to a mismatch between the agent’s state and the state captured by the persisted events.)
Finally, we return the account’s updated state (which is used when processing subsequent commands) and the result of the command. This result includes both the new state and the created event, wrapped in a Validation
. This makes it easy to send back to the client the details of the success and result of this request.
Notice how agents (and actors) complect state, behavior, and persistence (as such, they have been labeled “more object-oriented than objects”). In this implementation, I’m injecting a function for persisting the events, whereas most implementations of the actor model include some configurable mechanism for persisting an actor’s state.
We now have an AccountProcess
that can process commands applicable to a specific account in a thread-safe manner. But how does the code in an API endpoint get the instance of AccountProcess
for the relevant account? And how do we ensure that we never accidentally create two AccountProcess
es for the same account?
What we need is a single, application-wide registry that holds all live AccountProcess
es. It needs to manage their creation and serve them by ID so that the code handling client requests can get an AccountProcess
simply by providing the account ID included in the request.
Actor model implementations have such a registry built in, allowing you to register any particular actor against an arbitrary ID. In our case, we’ll build our own simple registry. The following listing shows a first attempt at doing this.
using AccountsCache = ImmutableDictionary<Guid, AccountProcess>; public class AccountRegistry { Agent<Guid, Option<AccountProcess>> agent; public AccountRegistry ( Func<Guid, Task<Option<AccountState>>> loadState, Func<Event, Task<Unit>> saveAndPublish ) => this.agent = Agent.Start ( initialState: AccountsCache.Empty, process: async (AccountsCache cache, Guid id) => { if (cache.TryGetValue(id, out AccountProcess account)) return (cache, Some(account)); var optAccount = await loadState(id); ❶ return optAccount.Map(accState => { AccountProcess account = new(accState, saveAndPublish); ❷ return (cache.Add(id, account), Some(account)); }) .GetOrElse(() => (cache, None)); } ); public Task<Option<AccountProcess>> Lookup(Guid id) => agent.Tell(id); }
❶ If the requested AccountProcess
is not in the cache, loads the current state from the DB
❷ Creates an AccountProcess
with the retrieved state
In this implementation, we have a single agent that manages a cache where all live instances of AccountProcess
are kept. If no AccountProcess
is found for the given ID, the account’s current state is retrieved from the DB and used to create a new AccountProcess
, which is added to the cache. Notice that, as usual, the loadState
function returns a Task<Option<AccountState>>
to acknowledge that the operation is asynchronous and that it’s possible that no data is found for a given ID.
Before you read on, go through the implementation again. Can you see any problems with this approach? Let’s see: loading the account state from the DB is done within the agent body; is that warranted? This means that reading the state for account x will block another thread that’s interested in account y. That’s certainly suboptimal!
This is the kind of schoolboy error that’s common when you’re getting used to programming with agents or actors. Although agents and actors are similar to objects, you can’t think of them as such. The error in listing 19.8 is that we’re conceptually giving the agent the responsibility of providing the caller with the requested AgentProcess
, and this gives us a suboptimal solution.
Instead, agents should only have the responsibility of managing some state. The agent in question manages a dictionary, so we can call it to look up an item, or to add a new item, but going to the DB to retrieve data is a relatively slow operation that’s not directly pertinent to managing the cache of AgentProcess
es.
With this in mind, let’s think of an alternative solution. A thread that wants to get hold of an AgentProcess
for an account ID should do the following:
If no AgentProcess
is stored, retrieve the state of the account from the DB (this time-consuming operation will be done in the calling thread, therefore without affecting the agent).
Ask the agent to create and register a new AgentProcess
with the given state and ID.
This means that we may need to go to the agent twice, so we need two different message types to specify what we want the agent to do. The following listing shows that different types of messages can be defined to convey the caller’s intention.
public class AccountRegistry { abstract record Msg(Guid Id); record LookupMsg(Guid Id) : Msg(Id); record RegisterMsg(Guid Id, AccountState AccountState) : Msg(Id); }
I’ve defined these message types as inner classes because they’re only used within the AccountRegistry
class to communicate with its agent.
We can now define the Lookup
method, which constitutes the AccountRegistry
’s public API (and is, therefore, executed on the caller’s thread), as follows:
public class AccountRegistry { Agent<Msg, Option<Account>> agent; Func<Guid, Task<Option<AccountState>>> loadState; public Task<Option<Account>> Lookup(Guid id) => agent .Tell(new LookupMsg(id)) ❶ .OrElse(() => from state in loadState(id) ❷ from account in agent.Tell(new RegisterMsg(id, state)) ❸ select account); }
❶ Tells the agent to look up the given ID
❷ If the lookup fails, the state is loaded in the calling thread.
❸ Tells the agent to register a new process with the given state and ID
It first asks the agent to look up the ID; if the lookup fails, then the state is retrieved from the DB. Note that this is done on the calling thread, leaving the agent free to handle other messages. Finally, a second message is sent to the agent asking it to create and register an AccountProcess
with the given account state and ID.
Notice that everything happens within the Task<Option<>>
stack because this is the type returned both by loadState
and by Tell
. Even OrElse
here resolves to an overload I’ve defined on Task<Option<T>>
, which executes the given fallback function if the Task
has faulted or if the inner Option
is None
.
All that’s left to show is the revised definition of the agent, which is started in the AccountRegistry
’s constructor. The following listing shows this.
using AccountsCache = ImmutableDictionary<Guid, Agents.Account>; public class AccountRegistry { Agent<Msg, Option<Account>> agent; Func<Guid, Task<Option<AccountState>>> loadState; public AccountRegistry ( Func<Guid, Task<Option<AccountState>>> loadState, Func<Event, Task<Unit>> saveAndPublish ) { this.loadState = loadState; this.agent = Agent.Start ( initialState: AccountsCache.Empty, process: (AccountsCache cache, Msg msg) => msg switch ❶ { LookupMsg m => (cache, cache.Lookup(m.Id)), RegisterMsg m => cache.Lookup(m.Id).Match ( Some: acc => (cache, Some(acc)), ❷ None: () => { AccountProcess acc ❸ = new(m.AccountState, saveAndPublish); ❸ return (cache.Add(m.Id, acc), Some(acc)); ❸ } ) } ); } public Task<Option<Account>> Lookup(Guid id) => // as above... }
❶ The agent uses pattern matching to perform different actions depending on the message it’s sent.
❷ An edge case in which two concurrent requests have both loaded the account state
❸ Creates and registers a new AccountProcess
This implementation is slightly more complex but more efficient, and this example has given us the chance to see a common pitfall when programming with agents: namely, performing an expensive operation in the body of an agent that doesn’t strictly require synchronized access to the agent’s state.
On the other hand, in both proposed implementations, once an AccountProcess
is created, it’s never terminated; it will persist events to the DB to keep the stored version in sync with the in-memory state, but we read from the DB at most once. Is this a good thing or bad? It depends on how much data you’ll eventually have in memory and how much memory you have available. It’s potentially a huge optimization because access to in-memory data is orders of magnitude faster than access to the DB. The ability to keep all your data in memory is one of the big draws of the actor model: because actors can be distributed across machines, there’s no effective limit on the amount of memory you can use, and accessing memory (even over the network) is much faster than accessing even a local DB.
With the previous building blocks in place, let’s see how our implementation for the API endpoint changes:
public static void ConfigureMakeTransferEndpoint ( WebApplication app, Validator<MakeTransfer> validate, AccountRegistry accounts ❶ ) { var getAccountVal = (Guid id) ❷ => accounts .Lookup(id) .Map(opt => opt.ToValidation(Errors.UnknownAccountId(id))); app.MapPost("/Transfer/Make", (MakeTransfer transfer) => { Task<Validation<AccountState>> outcome = from cmd in Async(validate(transfer)) from acc in getAccountVal(cmd.DebitedAccountId) from result in acc.Handle(cmd) ❸ select result.NewState; return outcome.Map( Faulted: ex => StatusCode(500), Completed: val => val.Match( Invalid: errs => BadRequest(new { Errors = errs }), Valid: newState => Ok(new { Balance = newState.Balance }))); }); }
❶ Required to get an AccountProcess
by account ID
❷ Changes from Task<Option<>>
to Task<Validation<>>
❸ The AccountProcess
handles the command, updating the account state and persisting/publishing the corresponding event.
The endpoint implementation depends on a Validator
for validating the command and on the AccountRegistry
for retrieving an AccountProcess
for the relevant account.
The main change, compared to the version in chapter 13, is that the result
tuple is only returned for feedback, whereas persisting and publishing the event happens in the AccountProcess
’s Handle
method. This, as you’ve seen, is required to prevent concurrent modifications to the account’s state, which could violate business rules such as limiting the account’s maximal overdraft.
I’m not including the implementation for the functions that read and write events to storage because they’re so technology-specific and don’t entail any particularly interesting logic.
You’ve now seen all the main components of an end-to-end solution for handling a money transfer with the added constraints of synchronized access to the account state.
Shared mutable state that’s accessed concurrently can cause difficult problems.
For this reason, you should avoid shared mutable state entirely whenever possible. This is often the case in parallel processing scenarios.
In other types of concurrency, notably in multithreaded applications that need to model real-world entities, shared mutable state is often required.
Access to shared mutable state must be serialized to avoid inconsistent changes to the data. This can be achieved using locks but also using lock-free techniques.
Message-passing concurrency is a technique that avoids locks by restricting state mutation to processes (actors/agents) that have exclusive ownership of some state, which they can access single-threadedly in reaction to messages they’re sent.
Agents and actors are fundamentally similar, but there are important differences:
Message-passing concurrency feels quite different from other FP techniques, mainly because FP works by composing functions, whereas actors/agents tend to work in a fire-and-forget fashion.
It’s possible to write high-level functional APIs with underlying agent-based or actor-based implementations.
1 In fact, there are several different strategies for implementing STM with different characteristics. Some implementations also enforce consistency, meaning that it’s possible to enforce invariants that a transaction can’t violate. Do the properties atomicity, consistency, and isolation sound familiar? That’s because they are three of the ACID properties guaranteed by many databases—the last one being durability, which, of course, does not apply to STM as it specifically pertains to in-memory data.
2 I already mentioned language-ext, a functional library for C#, in the front matter. The code is available at https://github.com/louthy/language-ext, and for some basic code samples showing how to use the STM features, see https://github.com/louthy/language-ext/wiki/Concurrency.
3.137.181.66