19 An introduction to message-passing concurrency

This chapter covers

  • Why shared mutable state is sometimes required
  • Understanding message-passing concurrency
  • Programming with agents in C#
  • Hiding agent-based implementations behind conventional APIs

Every seasoned developer has some first-hand experience of how difficult it can be to deal with problems such as deadlocks and race conditions. These are the hard problems that can arise in concurrent programs that involve shared mutable state (“shared,” that is, between processes that execute concurrently).

This is why, throughout this book, you’ve seen many examples of how to solve problems without making recourse to shared mutable state. Indeed, my recommendation is to avoid shared mutable state whenever possible, and FP provides an excellent paradigm for doing so.

In this chapter, you’ll see why it’s not always possible to avoid shared mutable state, and what strategies there are to synchronize access to shared mutable state. We’ll then concentrate on one of these strategies: agent-based concurrency, a style of concurrent programming that relies on message-passing between agents that “own” some state that they access in a single-threaded way. Programming with agents is popular with F# programmers, but you’ll see that it’s perfectly doable in C#.

19.1 The need for shared mutable state

It’s generally possible to avoid shared mutable state when designing parallel algorithms. For instance, if you have a computationally intensive problem that you’d like to parallelize, you can usually break the data set or the tasks down in such a way that several threads compute an intermediate result independently. Hence, these threads can do their work without the need to share any state. Another thread may compute the final result by combining all the intermediate results.

The problem, however, is that avoiding shared mutable state isn’t always possible. Although it can generally be achieved in the case of parallel computations, it’s much more difficult if the source of concurrency is multithreading. For example, imagine a multithreaded application, such as a server handling requests on multiple threads, that needs to do the following:

  • Keep an application-wide counter so that unique, sequential account numbers can be generated.

  • Cache some resources in memory to improve efficiency.

  • Represent real-world entities like items for sale, trades, contracts, and so on, ensuring that you don’t sell the same (unique, real-world) item twice if two concurrent requests to buy it are received.

In such scenarios, it’s essentially a requirement to share mutable state between the many threads that the server application uses to more efficiently handle requests. To prevent concurrent access from leading to data inconsistencies, you need to ensure the state can’t be accessed (or, at least, updated) concurrently by different threads. That is, you need to synchronize access to shared mutable state.

In mainstream programming, this synchronization is usually achieved using locks. Locks define critical sections of the code that can only be entered by one thread at a time. When one thread enters a critical section, it blocks other threads from entering it. Functional programmers tend to avoid using locks and resort, instead, to alternative techniques:

  • Compare-and-swap (CAS)—CAS allows you to atomically read and update a single value, which can be done in C# using the Interlocked.CompareExchange methods.

  • Software transactional memory (STM)—STM allows you to update mutable state within transactions, which offers some interesting guarantees about how these updates take place:

    • Each thread performs a transaction in isolation. It sees a view of the program state that is unaffected by transactions that occur concurrently on other threads.
    • Transactions are then committed atomically. Either all changes in the transaction are saved or none.1
    • Conflicting transactions don’t necessarily fail. If a transaction fails because of conflicting changes made in another, concurrent transaction, it can be retried with a fresh view of the data.
  • Message-passing concurrency—The idea of this approach is that you set up lightweight processes that have exclusive ownership of some mutable state. Communication between processes is via message-passing, and processes handle messages sequentially, hence preventing concurrent access to their state. There are two main embodiments of this approach:

    • The actor model—This was most famously implemented at Ericsson in conjunction with the Erlang language, but implementations for other languages, including C#, abound. In this model, processes are called actors, and they can be distributed across different processes and machines.
    • Agent-based concurrency—This is inspired by the actor model, but it’s much simpler because processes, called agents, only exist within one application.

CAS only allows you to deal with a single value, so it provides an effective solution for a very limited number of scenarios.

STM is an important paradigm for in-process concurrency, and it’s particularly popular among Clojure and Haskell developers because these languages come with a compelling and battle-tested implementation of STM. If you want to explore this paradigm in C#, language-ext contains an implementation of Atom and Ref, the primitives allowing you to atomically update data shared between threads.2

In the rest of this chapter, I’ll concentrate on message-passing concurrency, especially agent-based concurrency. You’ll later see how agents and actors differ in more detail. Let’s begin by looking at message-passing concurrency as a programming model.

19.2 Understanding message-passing concurrency

You can think of an agent (or actor; the fundamental idea is the same) as a process that has exclusive ownership of some mutable state. Communication between actors is via message passing so that the state can never be accessed from outside of the actor. Furthermore, incoming messages are processed sequentially so that concurrent state updates can never take place.

Figure 19.1 illustrates an agent: a process that runs in a loop. It has an inbox in which messages are queued, and it has some state. When a message is dequeued and processed, the agent typically does some of the following:

  • Performs side effects

  • Sends messages to other agents

  • Creates other agents

  • Computes its new state

Figure 19.1 An agent consists of a message inbox and a processing loop.

The new state will be used as the current state at the next iteration, when the following message is processed.

Let’s begin with an idealized, almost pseudocode implementation of an agent as I just described. Look at the code in the following listing in detail and see how each part corresponds to what’s depicted in figure 19.1.

Listing 19.1 Idealized implementation of an agent

public sealed class Agent<State, Msg>
{
   BlockingCollection<Msg> inbox                     
      = new BlockingCollection<Msg>(new ConcurrentQueue<Msg>());
 
   public void Tell(Msg message)                     
      => inbox.Add(message);                         
 
   public Agent                                      
   (
      State initialState,
      Func<State, Msg, State> process
   )
   {
      void Loop(State state)
      {
         Msg message = inbox.Take();                 
         State newState = process(state, message);   
         Loop(newState);                             
      }
 
      Task.Run(() => Loop(initialState));            
   }
}

Uses a concurrent queue as a message inbox

Telling a message to the agent simply enqueues the message.

Creates an agent by providing an initial state and a processing function

Dequeues a message as soon as it’s available

Processes the message, determining the new state of the agent

Loops with the new state

The actor runs in its own process.

There are several interesting things to point out here. First, notice that there are only two public members, so only two interactions with an agent are allowed:

  • You can create (or start) an agent.

  • You can tell it a message, which simply enqueues the message in the agent’s inbox.

You can define more complex interactions from these primitive operations.

Let’s now look at the processing loop, encoded in the Loop function. This dequeues the first message from the inbox (or waits until a message becomes available) and processes it using the agent’s processing function and its current state. This yields the agent’s new state, which is used in the next execution of the loop.

Notice that the implementation is side-effect free, apart from any side effects that may occur when calling the given processing function. The way changes in state are captured is by always passing the state as an argument to the Loop function (a technique you already saw in chapter 15).

Notice also that this implementation assumes that State must be an immutable type; otherwise, it could be shared by the process function and updated arbitrarily outside of the scope of the agent’s processing loop. As a result, the state only “appears” to be mutable because a new version of the state is used with each invocation of Loop.

Finally, take a moment to look at the signature for the constructor. Does it remind you of anything? Compare it with Enumerable.Aggregate. Can you see that it’s essentially the same? The current state of an agent is the result of reducing all the messages it’s received so far, using the initial state as an accumulator value and the processing function as a reducer. It’s a fold in time over the stream of messages received by the agent.

This implementation is elegant, and it would work well in a language with tail-call elimination. This is not featured in C#, so we’ll need to make some changes for a stack-safe implementation. Furthermore, we can also dispense with many of the low-level details by using existing functionality in .NET. We’ll look at this next.

19.2.1 Implementing agents in C#

.NET includes an implementation of agents called MailboxProcessor, but it was designed for use from F# and is awkward to use from C#. And, although the preceding implementation is useful for understanding the idea, it’s not optimal. Instead, in the coming examples, I’ll use a more practical implementation of an agent, which is included in LaYumba.Functional and shown in the following listing.

Listing 19.2 Implementation of an agent that builds on Dataflow.ActionBlock

using System.Threading.Tasks.Dataflow;
 
public interface Agent<Msg>
{
   void Tell(Msg message);
}
 
class StatefulAgent<State, Msg> : Agent<Msg>
{
   private State state;
   private readonly ActionBlock<Msg> actionBlock;
 
   public StatefulAgent
   (
      State initialState,
      Func<State, Msg, State> process
   )
   {
      state = initialState;
 
      actionBlock = new ActionBlock<Msg>(msg =>
      {
         var newState = process(state, msg);   
         state = newState;                     
      });
   }
 
   public void Tell(Msg message)
      => actionBlock.Post(message);            
}

Processes the message with the current state

Assigns the result to the stored state

Queueing and processing messages is managed by the ActionBlock.

Here I’ve replaced the recursive call (which could lead to stack overflow) with a single mutable variable state that keeps track of the agent’s state and is reassigned as each message is processed. Although this is a side effect, messages are processed sequentially, therefore preventing concurrent writes.

I’ve also dispensed with the details of managing the queue and process by using ActionBlock, one of the building blocks in .NET’s Dataflow library. An ActionBlock contains a buffer (by default, unbounded in size) that acts as the agent’s inbox and that only allows a fixed number of threads to enter the block (by default, a single thread), ensuring messages are processed sequentially.

State should still be an immutable type (otherwise, as previously pointed out, it could be shared by the process function and mutated outside the scope of the ActionBlock). If this is observed, the code is thread-safe.

From the point of view of the client code, nothing has changed: we still only have two public members with the same signatures as before. The reason for the Agent<Msg> interface is twofold:

  • From the point of view of the client code consuming an agent, you can only tell it messages, so by using the interface, we avoid exposing the type parameter for the state. After all, the type of the state is an implementation detail of the agent.

  • You can envisage other implementations such as stateless agents or agents that persist their state.

Finally, here are some convenience methods for easily creating agents:

public static class Agent
{
   public static Agent<Msg> Start<State, Msg>
      ( State initialState
      , Func<State, Msg, State> process)
      => new StatefulAgent<State, Msg>(initialState, process);
 
   public static Agent<Msg> Start<Msg>(Action<Msg> action)
      => new StatelessAgent<Msg>(action);
}

The first overload simply creates an agent with the given arguments. The second takes an action and is used to create a stateless agent: an agent that processes messages sequentially but doesn’t keep any state. (The implementation is trivial, as it just creates an ActionBlock with the given Action). We can also define agents with an asynchronous processing function/action; I’ve omitted the overloads for brevity, but the full implementation is in the code samples. Next, we’ll get started using agents.

19.2.2 Getting started with agents

Let’s look at some simple examples of using agents. We’ll build a couple of simple agents that interact as figure 19.2 shows.

Figure 19.2 Simple interaction between agents by exchanging messages

We’ll start with a really simple, stateless agent that takes a message of type string and just prints it out. You can follow along in the REPL:

Agent<string> logger = Agent.Start((string msg) => WriteLine(msg));
 
logger.Tell("Agent X");
// prints: Agent X

Next, let’s define the ping and pong agents that interact with the logger and with each other:

Agent<string> ping, pong = null;
 
ping = Agent.Start((string msg) =>
{
   if (msg == "STOP") return;
 
   logger.Tell($"Received '{msg}'; Sending 'PING'");
   Task.Delay(500).Wait();
   pong.Tell("PING");
});
 
pong = Agent.Start(0, (int count, string msg) =>
{
   int newCount = count + 1;
   string nextMsg = (newCount < 5) ? "PONG" : "STOP";
 
   logger.Tell($"Received '{msg}' #{newCount}; Sending '{nextMsg}'");
   Task.Delay(500).Wait();
   ping.Tell(nextMsg);
 
   return newCount;
});
 
ping.Tell("START");

Here, we define two additional agents. ping is stateless; it sends a message to the logger agent and a PING message to the pong agent, unless the message it’s told is STOP, in which case, it does nothing. It’s quite common for an agent to have different behavior depending on the message, that is, to interpret the message as a command.

Now let’s see a stateful agent: pong. The implementation is quite similar to ping. It sends PONG to ping, but it also keeps a counter as state. The counter is incremented with every message, and after five messages, the agent sends a STOP message instead.

The whole ping-pong is set in motion when we send the initial START message to ping on the last line. Running the program causes the following to be printed:

Received 'START'; Sending 'PING'
Received 'PING' #1; Sending 'PONG'
Received 'PONG'; Sending 'PING'
Received 'PING' #2; Sending 'PONG'
Received 'PONG'; Sending 'PING'
Received 'PING' #3; Sending 'PONG'
Received 'PONG'; Sending 'PING'
Received 'PING' #4; Sending 'PONG'
Received 'PONG'; Sending 'PING'
Received 'PING' #5; Sending 'STOP'

Now that you’ve seen some simple agents interact, it’s time to move on to something closer to real-world requirements.

19.2.3 Using agents to handle concurrent requests

Let’s revisit the scenario of a service that provides exchange rates. The service should retrieve rates from the Rates API and cache them. We saw a simple implementation in section 15.1, but there the interaction was via the command line so that requests necessarily came in one by one.

Let’s change that. Let’s imagine that the service is part of a larger system and that other components may request rates via a message broker, as figure 19.3 illustrates.

Figure 19.3 A system in which several requests may be received concurrently

Components communicate with each other by sending messages via the message broker. To communicate with the currency lookup service, the following messages are defined:

record FxRateRequest
(
   string CcyPair,     
   string Sender       
 );
 
record FxRateResponse
(
   string CcyPair,
   decimal Rate,
   string Recipient    
);

The currency pair whose rate is being requested

The sender and recipient fields allow the message broker to correctly route messages.

We’ll assume that the message broker is multithreaded, so that our service may receive multiple requests on different threads at exactly the same time.

In this case, sharing state between threads is a requirement: if we had a different cache for every thread, that would be suboptimal. So we need some synchronization to ensure that we don’t perform unnecessary remote lookups and that cache updates don’t cause race conditions.

Next, we’ll see how we can use agents to achieve this. First, we’ll need a bit of setup code, defining the interaction with the message broker. This is shown in listing 19.3. Note that the code isn’t specific to any particular message broker; we just need to be able to subscribe to it to receive requests and to use it to send responses. (The code samples include an implementation of MessageBroker that uses Redis as its underlying transport.)

Listing 19.3 Setting up the interaction with the message broker

public static void SetUp(MessageBroker broker)
{
   Agent<FxRateResponse> sendResponse = Agent.Start(
      (FxRateResponse res) => broker.Send(res.Recipient, res));      
 
   Agent<FxRateRequest> processRequest
      = StartReqProcessor(sendResponse);                             
 
   broker.Subscribe<FxRateRequest>("FxRates", processRequest.Tell);  
}

An agent that sends the responses

An agent that processes the requests and uses the previously defined agent to send the response

When a request is received, passes it to the processing agent

Starting at the bottom, we subscribe to receive requests broadcast on the “FxRates” channel, providing a callback to handle the request. This callback (which will be called on multiple threads) simply passes the request to the processing agent, defined on the previous line. Hence, although requests are received on multiple threads, they’ll be immediately queued up in the processing agent’s inbox and processed sequentially.

Does this mean that processing is now single-threaded, and we lose any benefit of multithreading? Not necessarily! If the processing agent did all the processing, that would indeed be the case. Instead, let’s take a more granular approach: we can have an agent for each currency pair in charge of fetching and storing the rate for its particular pair. The request-processing agent will just be in charge of managing these per-currency-pair agents and delegating the work to them, as figure 19.4 shows.

Figure 19.4 Breaking up the work between agents that can run concurrently

Let’s now look at the definitions of the agents. The following listing shows the higher-level agent, which handles incoming requests and starts the lower-level per-currency-pair agents, delegating work to them.

Listing 19.4 A coordinating agent routes requests to a per-currency-pair agent

using CcyAgents = System.Collections.Immutable
   .ImmutableDictionary<string, Agent<string>>;
 
static Agent<FxRateRequest> StartReqProcessor
   (Agent<FxRateResponse> sendResponse)
 
   => Agent.Start(CcyAgents.Empty
      , (CcyAgents state, FxRateRequest request) =>
   {
      string ccyPair = request.CcyPair;
 
      Agent<string> agent = state
         .Lookup(ccyPair)
         .GetOrElse(() => StartAgentFor(ccyPair, sendResponse));   
 
      agent.Tell(request.Sender);                                  
      return state.Add(ccyPair, agent);
   });

If required, starts a new agent for the requested currency pair

Passes the request to the agent in charge of the pair

As you can see, the request-processing agent holds not a cache of values, but of agents—one for each currency pair. It starts those agents as needed and forwards the requests to them.

The benefit of this solution is that requests for one currency, say GBPUSD, won’t impact requests for another, say EURUSD. On the other hand, if you get several requests for GBPUSD at the same time, only one remote request is made to fetch that rate while other requests are queued.

Finally, the following listing provides the definition of the agent that manages the rate for a single currency pair.

Listing 19.5 An agent managing the FX rate for a single currency pair

static Agent<string> StartAgentFor
(
   string ccyPair,
   Agent<FxRateResponse> sendResponse
)
=> Agent.Start<Option<decimal>, string>
(
   initialState: None,
   process: async (optRate, recipient) =>
   {
      decimal rate = await optRate.Map(Async)
         .GetOrElse(() => RatesApi.GetRateAsync(ccyPair));   
 
      sendResponse.Tell(new FxRateResponse                   
      (
         CcyPair: ccyPair,
         Rate: rate,
         Recipient: recipient
      ));
 
      return Some(rate);                                     
   }
);

If necessary, fetches the rate from the remote API

Sends the response

The agent’s new state

This agent’s state is the rate for a single pair; it’s wrapped in an Option because, when the agent is first created, it has no rate available yet. Upon receiving a request, the agent decides whether a remote lookup is required (you could easily improve this to fetch the rate if the cached value is expired).

To keep the example simple, I’ve avoided the question of expiry as well as error handling. I’m also assuming that sending the request to the message broker is a fire-and-forget operation with minimal latency so that it’s OK to have a single agent performing it.

The main point of the example is that using agents with their sequential processing of messages can be quite efficient. It does, however, require a mental shift, both from the functional approach we’ve been pursuing in this book and from the traditional approach of using locks.

19.2.4 Agents vs. actors

Agents and actors are closely related. In both cases, a single thread processes messages sequentially and communicates with other actors/agents by sending them messages. There are also important differences:

  • Agents run within a single process, whereas actors are designed for distributed systems. In the examples we’ve looked at so far, a reference to an agent refers to a specific instance in the current process. References to actors, on the other hand, are location-transparent; when you have a reference to an actor, that actor may be running in the same process or in another process, possibly on a remote machine. A reference to an agent is a pointer, whereas a reference to an actor is an ID that the actor model implementation uses to route messages across processes as appropriate.

  • Actor model implementations are designed to be fault-tolerant. For example, Erlang includes supervisors: actors that monitor supervised actors and take action when they fail. Regular actors handle the happy path, whereas supervisors take care of recovery, ultimately improving the robustness of the system. There’s no counterpart to this with agents.

  • The state of an agent (or an actor) should be immutable and never shared outside the scope of the agent. In our agent implementation, however, there’s nothing stopping an inexperienced developer from creating an agent whose state is mutable and from passing that mutable state to other components, thus allowing that state to be changed from outside the scope of the agent. With actors, messages are serialized, so this should never occur.

As you can see, although the fundamental idea behind agents and actors is the same, the actor model is richer and more complex. You should only consider using the actor model if you require coordination of concurrent operations across different applications or machines; otherwise, the operational and setup cost would be unjustified, and you should rely on agents instead.

Although I was able to implement an actor with just a few lines of code, implementing the actor model is much more complex. So if you want to use actors, you’ll probably use one of several implementations of the actor model for .NET:

  • Orleans (https://github.com/dotnet/orleans) is Microsoft’s take on the actor model. It has a distinctly object-oriented feel. The underlying philosophy is that less experienced developers can interact with actors (called grains) as though they were local objects, without being exposed to any additional complexities specific to the actor model. Orleans takes care of managing the grains’ lifecycle, meaning that their state is kept in memory or persisted to storage automatically. Persistence can be to a variety of media including SQL Server and cloud storage on Azure.

  • Akka.NET (http://getakka.net/) is a community-driven port of the Akka framework popular with Scala developers. It predates Orleans and is much more explicit about its message-driven nature, so the barrier to entry is higher. A variety of options are available for message transport and persistence of the actors’ state.

  • echo (https://github.com/louthy/echo-process) is the .NET implementation closest to Erlang, and was developed by Paul Louth. It’s the most lightweight option, both in terms of syntax and configuration: you can create an actor (called a process) with a function like we did with agents, or you can use an interface-based approach (which reads more naturally if you need to handle different kinds of messages). Out of the box, echo only supports messaging across application domains and persistence via Redis, but you can implement adapters to target different infrastructure.

All these actor model implementations differ both in terminology and in important technical details, so it’s difficult to offer a description of the actor model without being somewhat specific to one implementation. This is one of the reasons why I’ve opted to illustrate the fundamental ideas of message-passing concurrency with a simple implementation of agents instead. You can carry these principles over to actor-based programming, but you’ll need to learn other principles such as error handling with supervisors and the guarantees for message delivery offered by the specific implementation you’re using.

19.3 Functional APIs, agent-based implementations

Is agent-based programming even functional? Although agents and actors were developed in the context of functional languages (remember, in our idealized implementation an agent was side-effect free), agent-based programming differs starkly from the functional techniques you’ve seen throughout this book:

  • You tell an agent a message, and this is usually interpreted as a command, so the semantics are rather imperative.

  • An agent often performs a side effect or tells a message to another agent, which will, in turn, perform a side effect.

  • Most importantly, telling an agent a message returns no data, so you can’t compose tell operations into pipelines the way you can with functions.

  • FP separates logic from data; agents contain data and at least some logic in the processing function.

As a result, agent-based programming “feels” different from FP as you’ve seen it so far, so it’s debatable whether or not agent-based concurrency is actually a functional technique. If you think it’s not (as I’m inclined to), then you must conclude that FP is not good at certain types of concurrency (where shared mutable state can’t be avoided), and it needs to be complemented with a different paradigm such as agent-based programming or the actor model.

With agents, it’s easy to program unidirectional data flows: the data always flows forward (to the next agent), and no data is ever returned. In the face of this, we have two choices:

  • Embrace the idea of unidirectional data flow, and write your applications in this style. In this approach, if you had clients connecting to a server, you wouldn’t use a request-response model like HTTP but rather a message-based protocol such as WebSockets or a message broker. This is a viable approach, especially if your domain is event-driven enough that you already have a messaging infrastructure in place.

  • Hide the agent-specific details behind a more conventional API. This implies that agents should be able to return a response to a message sender. In this approach, agents are used as concurrency primitives that are implementation details (just as locks are) and that should not dictate the program’s design. We’ll explore this approach next.

19.3.1 Agents as implementation details

The first thing we need is a way to get a reply from an agent, a “return value” of sorts. Imagine that a sender creates a message that includes a handle that it can wait on. It then tells the message to an agent, which signals a result on that handle, making it available to the sender. With this, we can effectively have two-way communication on top of the fire-and-forget Tell protocol.

TaskCompletionSource provides a suitable handle for this purpose: the sender can create a TaskCompletionSource, add it to the message payload, and await its Task. The agent will do its work and set the result on the TaskCompletionSource when ready. Doing this manually for every message for which you want a response would be tedious, so instead, I’ve included in my LaYumba.Functional library a beefed-up agent that takes care of all this wiring. I won’t include the implementation details here, but the interface definition is as follows:

public interface Agent<Msg, Reply>
{
   Task<Reply> Tell(Msg message);
}

Notice that this is a completely new interface with not one but two generic arguments: the type of messages that the agent accepts and the type that it replies with. Telling a message of type Msg to this agent returns a Task<Reply>. To start an agent of this type, we’ll use a processing function of type

State  Msg  (State, Reply)

or its asynchronous version

State  Msg  Task<(State, Reply)>

which is a function that, given the agent’s current state and the received message, computes not only the agent’s new state, but also a reply to be returned to the sender.

Let’s look at a simple example—an agent that keeps a counter and can be told to increment the counter, and also returns the counter’s new value:

var counter = Agent.Start(0
   , (int state, int msg) =>
   {
      var newState = state + msg;
      return (newState, newState);   
    });

Returns the new state to be stored and the reply to the sender

You can now consume this agent like this:

var newCount = await counter.Tell(1);
newCount // => 1
newCount = await counter.Tell(1);
newCount // => 2

Notice that Tell returns a Task<int>, so the caller can just await the reply, as with any asynchronous function. Essentially, you can use this agent as a thread-safe, stateful, asynchronous version of a function of type Msg Reply:

  • Thread safe because it internally uses an ActionBlock that processes one message at a time

  • Stateful because the state kept by the agent can change as a result of processing a message

  • Asynchronous because your message may have to wait while the agent processes other messages in its queue

This means that, compared to using locks, you’re not only gaining in safety (no deadlocks) but also in performance (locks block the current thread, whereas await frees the thread to do other work).

19.3.2 Hiding agents behind a conventional API

Now that we have a mechanism for two-way communication in place, we can improve the API by hiding the specifics of agent-based programming. For example, in the case of a counter, we could define a Counter class as the following listing shows.

Listing 19.6 Hiding an agent-based implementation behind a public API

public sealed class Counter
{
   readonly Agent<int, int> counter =           
      Agent.Start(0, (int state, int msg) =>
         {
            var newState = state + msg;
            return (newState, newState);
         });
 
   public Task<int> IncrementBy(int amount)     
      => counter.Tell(amount);
}

The agent is an implementation detail.

Public interface of the Counter

Now a consumer of Counter can be blissfully unaware of its agent-based implementation. A typical interaction would look like this:

var counter = new Counter();
var newCount = counter.IncrementBy(10);
await newCount // => 10

19.4 Message-passing concurrency in LOB applications

In LOB applications, the need to synchronize access to some shared state usually arises from the fact that entities in the application represent real-world entities, and we need to ensure that concurrent access doesn’t leave them in an invalid state or otherwise break business rules. For example, two concurrent requests to purchase a particular item shouldn’t result in that item being sold twice. Similarly, concurrent moves in a multiplayer game shouldn’t lead to an invalid state of the game.

Let’s see how this would play out in our banking scenario. We need to ensure that when different transactions (debits, credits, transfers) happen concurrently, they don’t leave the account in an invalid state. Does this mean we need to synchronize access to the account data? Not necessarily! Let’s see what happens if we don’t take any special measures with respect to concurrency.

Imagine an account with a balance of 1,000. An automated direct debit payment occurs causing 800 to be debited from the account. Concurrently, a transfer of 200 is requested so that the amount of 200 is also debited. If we use the event-sourced approach shown so far in this book, we get the following result:

  • The direct debit request causes the creation of an event, capturing a debit of 800, and the caller will receive an updated state with a balance of 200.

  • The transfer request likewise causes the creation of an event, capturing a debit of 200, and the caller will receive an updated state with a balance of 800.

  • When the account is loaded next, its state is computed from all past events so that its state will correctly have a balance of 0.

  • As new events are published, any clients that subscribe to updates can reflect those changes in state. (For example, the client device on which the transfer request was made can be notified when the direct debit has taken place so that the account balance shown to the user is always up to date.)

In short, if you use immutable objects and event sourcing, you don’t get inconsistent data as a result of concurrent updates; this is an important benefit of event sourcing.

Let’s now enrich this scenario with a new business requirement. Each account is assigned a maximum allowed overdraft, meaning that an account’s balance can never go below a certain amount. Now imagine that we have the following:

  • An account with a balance of 1,000 and a maximum overdraft of 500

  • A direct debit payment of 800

  • Concurrently, a transfer request also for 800

If you don’t synchronize access to the account data, both requests will succeed, leading to the account having an overdraft of 600, which violates our business requirement that the overdraft should never exceed 500. To enforce the maximum allowed overdraft, we need to synchronize the execution of actions that modify the account balance, as a result of which one of the concurrent requests in this scenario should fail. Next, you’ll see how to achieve this using actors.

19.4.1 Using an agent to synchronize access to account data

To ensure that the account data can’t be affected concurrently by different requests, we can associate an agent with each account. Notice that agents are lightweight enough that it’s OK to have thousands or even millions of them. Also notice that I’m assuming there’s a single server process through which accounts can be affected. If this weren’t the case, you’d need to use an implementation of the actor model instead, but the gist of the following implementation would still be valid.

To associate an agent with an account, we’ll define an AccountProcess class with an agent-based implementation. This means we’re now using three classes to represent accounts:

  • AccountState—A record that represents the state of an account at a given moment in time

  • Account—A static class that only contains pure functions used to calculate state transitions

  • AccountProcess—An agent-based implementation that tracks the current state of an account and handles any commands that affect the account’s state

You saw implementations of Account and AccountState in chapter 13, and those don’t need to change. The following listing shows the implementation of AccountProcess.

Listing 19.7 Sequential processing of commands that affect an account

using Result = Validation<(Event Event, AccountState NewState)>;
 
public class AccountProcess
{
   Agent<Command, Result> agent;
 
   public AccountProcess
   (
      AccountState initialState,
      Func<Event, Task<Unit>> saveAndPublish
   )
   => this.agent = Agent.Start(initialState
      , async (AccountState state, Command cmd) =>
      {
         Result result = cmd switch
         {
            MakeTransfer transfer => state.Debit(transfer),        
            FreezeAccount freeze => state.Freeze(freeze),          
         };
 
         await result.Traverse(tpl => saveAndPublish(tpl.Event));  
         var newState = result
            .Map(tpl => tpl.NewState)
            .GetOrElse(state);
 
         return (newState, result);
      });
 
   public Task<Result> Handle(Command cmd)
      => agent.Tell(cmd);                                          
}

Uses pure functions to calculate the result of the command

Persists the event within the block so that the agent doesn’t process new messages in a nonpersisted state

All commands are queued and processed sequentially.

Each instance of AccountProcess internally holds an agent, so that all commands affecting an account can be processed sequentially. Let’s look at the body of the agent: first, we calculate the result of the command, given the command and the current state. This is done using pure, static functions only.

The result, remember, is a Validation with an inner value including the resulting Event and the new account state. If the result is Valid, we proceed to save and publish the created event (the check is done as part of Traverse).

It’s important to note that persistence happens within the processing function. That is, the agent shouldn’t update its state and start processing new messages before it has successfully persisted the event representing its current state transition. (Otherwise, persisting the event could fail, leading to a mismatch between the agent’s state and the state captured by the persisted events.)

Finally, we return the account’s updated state (which is used when processing subsequent commands) and the result of the command. This result includes both the new state and the created event, wrapped in a Validation. This makes it easy to send back to the client the details of the success and result of this request.

Notice how agents (and actors) complect state, behavior, and persistence (as such, they have been labeled “more object-oriented than objects”). In this implementation, I’m injecting a function for persisting the events, whereas most implementations of the actor model include some configurable mechanism for persisting an actor’s state.

19.4.2 Keeping a registry of accounts

We now have an AccountProcess that can process commands applicable to a specific account in a thread-safe manner. But how does the code in an API endpoint get the instance of AccountProcess for the relevant account? And how do we ensure that we never accidentally create two AccountProcesses for the same account?

What we need is a single, application-wide registry that holds all live AccountProcesses. It needs to manage their creation and serve them by ID so that the code handling client requests can get an AccountProcess simply by providing the account ID included in the request.

Actor model implementations have such a registry built in, allowing you to register any particular actor against an arbitrary ID. In our case, we’ll build our own simple registry. The following listing shows a first attempt at doing this.

Listing 19.8 Storing and managing the creation of AccountProcesses

using AccountsCache = ImmutableDictionary<Guid, AccountProcess>;
 
public class AccountRegistry
{
   Agent<Guid, Option<AccountProcess>> agent;
 
   public AccountRegistry
   (
      Func<Guid, Task<Option<AccountState>>> loadState,
      Func<Event, Task<Unit>> saveAndPublish
   )
   => this.agent = Agent.Start
   (
      initialState: AccountsCache.Empty,
      process: async (AccountsCache cache, Guid id) =>
      {
         if (cache.TryGetValue(id, out AccountProcess account))
            return (cache, Some(account));
 
         var optAccount = await loadState(id);                         
 
         return optAccount.Map(accState =>
         {
            AccountProcess account = new(accState, saveAndPublish);    
            return (cache.Add(id, account), Some(account));
         })
         .GetOrElse(() => (cache, None));
      }
   );
 
   public Task<Option<AccountProcess>> Lookup(Guid id)
      => agent.Tell(id);
}

If the requested AccountProcess is not in the cache, loads the current state from the DB

Creates an AccountProcess with the retrieved state

In this implementation, we have a single agent that manages a cache where all live instances of AccountProcess are kept. If no AccountProcess is found for the given ID, the account’s current state is retrieved from the DB and used to create a new AccountProcess, which is added to the cache. Notice that, as usual, the loadState function returns a Task<Option<AccountState>> to acknowledge that the operation is asynchronous and that it’s possible that no data is found for a given ID.

Before you read on, go through the implementation again. Can you see any problems with this approach? Let’s see: loading the account state from the DB is done within the agent body; is that warranted? This means that reading the state for account x will block another thread that’s interested in account y. That’s certainly suboptimal!

19.4.3 An agent is not an object

This is the kind of schoolboy error that’s common when you’re getting used to programming with agents or actors. Although agents and actors are similar to objects, you can’t think of them as such. The error in listing 19.8 is that we’re conceptually giving the agent the responsibility of providing the caller with the requested AgentProcess, and this gives us a suboptimal solution.

Instead, agents should only have the responsibility of managing some state. The agent in question manages a dictionary, so we can call it to look up an item, or to add a new item, but going to the DB to retrieve data is a relatively slow operation that’s not directly pertinent to managing the cache of AgentProcesses.

With this in mind, let’s think of an alternative solution. A thread that wants to get hold of an AgentProcess for an account ID should do the following:

  1. Ask the agent to look up the ID.

  2. If no AgentProcess is stored, retrieve the state of the account from the DB (this time-consuming operation will be done in the calling thread, therefore without affecting the agent).

  3. Ask the agent to create and register a new AgentProcess with the given state and ID.

This means that we may need to go to the agent twice, so we need two different message types to specify what we want the agent to do. The following listing shows that different types of messages can be defined to convey the caller’s intention.

Listing 19.9 Different message types convey the caller’s intention

public class AccountRegistry
{
   abstract record Msg(Guid Id);
   record LookupMsg(Guid Id) : Msg(Id);
   record RegisterMsg(Guid Id, AccountState AccountState) : Msg(Id);
}

I’ve defined these message types as inner classes because they’re only used within the AccountRegistry class to communicate with its agent.

We can now define the Lookup method, which constitutes the AccountRegistry’s public API (and is, therefore, executed on the caller’s thread), as follows:

public class AccountRegistry
{
   Agent<Msg, Option<Account>> agent;
   Func<Guid, Task<Option<AccountState>>> loadState;
 
   public Task<Option<Account>> Lookup(Guid id)
      => agent
         .Tell(new LookupMsg(id))                                   
         .OrElse(() =>
            from state in loadState(id)                             
            from account in agent.Tell(new RegisterMsg(id, state))  
            select account);
}

Tells the agent to look up the given ID

If the lookup fails, the state is loaded in the calling thread.

Tells the agent to register a new process with the given state and ID

It first asks the agent to look up the ID; if the lookup fails, then the state is retrieved from the DB. Note that this is done on the calling thread, leaving the agent free to handle other messages. Finally, a second message is sent to the agent asking it to create and register an AccountProcess with the given account state and ID.

Notice that everything happens within the Task<Option<>> stack because this is the type returned both by loadState and by Tell. Even OrElse here resolves to an overload I’ve defined on Task<Option<T>>, which executes the given fallback function if the Task has faulted or if the inner Option is None.

All that’s left to show is the revised definition of the agent, which is started in the AccountRegistry’s constructor. The following listing shows this.

Listing 19.10 An agent storing a registry of AccountProcesses

using AccountsCache
   = ImmutableDictionary<Guid, Agents.Account>;
 
public class AccountRegistry
{
   Agent<Msg, Option<Account>> agent;
   Func<Guid, Task<Option<AccountState>>> loadState;
 
   public AccountRegistry
   (
      Func<Guid, Task<Option<AccountState>>> loadState,
      Func<Event, Task<Unit>> saveAndPublish
   )
   {
      this.loadState = loadState;

      this.agent = Agent.Start
      (
         initialState: AccountsCache.Empty,
         process: (AccountsCache cache, Msg msg)
            => msg switch                                      
         {
            LookupMsg m => (cache, cache.Lookup(m.Id)),
 
            RegisterMsg m => cache.Lookup(m.Id).Match
            (
               Some: acc => (cache, Some(acc)),                
               None: () =>
               {
                  AccountProcess acc                           
                     = new(m.AccountState, saveAndPublish);    
                  return (cache.Add(m.Id, acc), Some(acc));    
               }
            )
         }
      );
   }
 
   public Task<Option<Account>> Lookup(Guid id) => // as above...
}

The agent uses pattern matching to perform different actions depending on the message it’s sent.

An edge case in which two concurrent requests have both loaded the account state

Creates and registers a new AccountProcess

This implementation is slightly more complex but more efficient, and this example has given us the chance to see a common pitfall when programming with agents: namely, performing an expensive operation in the body of an agent that doesn’t strictly require synchronized access to the agent’s state.

On the other hand, in both proposed implementations, once an AccountProcess is created, it’s never terminated; it will persist events to the DB to keep the stored version in sync with the in-memory state, but we read from the DB at most once. Is this a good thing or bad? It depends on how much data you’ll eventually have in memory and how much memory you have available. It’s potentially a huge optimization because access to in-memory data is orders of magnitude faster than access to the DB. The ability to keep all your data in memory is one of the big draws of the actor model: because actors can be distributed across machines, there’s no effective limit on the amount of memory you can use, and accessing memory (even over the network) is much faster than accessing even a local DB.

19.4.4 Putting it all together

With the previous building blocks in place, let’s see how our implementation for the API endpoint changes:

public static void ConfigureMakeTransferEndpoint
(
   WebApplication app,
   Validator<MakeTransfer> validate,
   AccountRegistry accounts                       
)
{
   var getAccountVal = (Guid id)                  
      => accounts
         .Lookup(id)
         .Map(opt => opt.ToValidation(Errors.UnknownAccountId(id)));
 
   app.MapPost("/Transfer/Make", (MakeTransfer transfer) =>
   {
      Task<Validation<AccountState>> outcome =
         from cmd in Async(validate(transfer))
         from acc in getAccountVal(cmd.DebitedAccountId)
         from result in acc.Handle(cmd)           
         select result.NewState;
 
      return outcome.Map(
        Faulted: ex => StatusCode(500),
        Completed: val => val.Match(
           Invalid: errs => BadRequest(new { Errors = errs }),
           Valid: newState => Ok(new { Balance = newState.Balance })));
   });
}

Required to get an AccountProcess by account ID

Changes from Task<Option<>> to Task<Validation<>>

The AccountProcess handles the command, updating the account state and persisting/publishing the corresponding event.

The endpoint implementation depends on a Validator for validating the command and on the AccountRegistry for retrieving an AccountProcess for the relevant account.

The main change, compared to the version in chapter 13, is that the result tuple is only returned for feedback, whereas persisting and publishing the event happens in the AccountProcess’s Handle method. This, as you’ve seen, is required to prevent concurrent modifications to the account’s state, which could violate business rules such as limiting the account’s maximal overdraft.

I’m not including the implementation for the functions that read and write events to storage because they’re so technology-specific and don’t entail any particularly interesting logic.

You’ve now seen all the main components of an end-to-end solution for handling a money transfer with the added constraints of synchronized access to the account state.

Summary

  • Shared mutable state that’s accessed concurrently can cause difficult problems.

  • For this reason, you should avoid shared mutable state entirely whenever possible. This is often the case in parallel processing scenarios.

  • In other types of concurrency, notably in multithreaded applications that need to model real-world entities, shared mutable state is often required.

  • Access to shared mutable state must be serialized to avoid inconsistent changes to the data. This can be achieved using locks but also using lock-free techniques.

  • Message-passing concurrency is a technique that avoids locks by restricting state mutation to processes (actors/agents) that have exclusive ownership of some state, which they can access single-threadedly in reaction to messages they’re sent.

  • An actor/agent is a lightweight process featuring

    • An inbox in which messages sent to it are queued up
    • Some state of which it has exclusive ownership
    • A processing loop in which it processes messages sequentially, taking actions such as creating and communicating with other agents, changing its state, and performing side effects
  • Agents and actors are fundamentally similar, but there are important differences:

    • Actors are distributed, whereas agents are local to a single process.
    • Unlike with agents, the actor model includes error handling provisions such as supervisors, which take action if the supervised actor fails.
  • Message-passing concurrency feels quite different from other FP techniques, mainly because FP works by composing functions, whereas actors/agents tend to work in a fire-and-forget fashion.

  • It’s possible to write high-level functional APIs with underlying agent-based or actor-based implementations.


1 In fact, there are several different strategies for implementing STM with different characteristics. Some implementations also enforce consistency, meaning that it’s possible to enforce invariants that a transaction can’t violate. Do the properties atomicity, consistency, and isolation sound familiar? That’s because they are three of the ACID properties guaranteed by many databases—the last one being durability, which, of course, does not apply to STM as it specifically pertains to in-memory data.

2 I already mentioned language-ext, a functional library for C#, in the front matter. The code is available at https://github.com/louthy/language-ext, and for some basic code samples showing how to use the STM features, see https://github.com/louthy/language-ext/wiki/Concurrency.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.181.66