Chapter 6. Event Sourcing and CQRS

Technology can solve a lot of problems, but code, libraries, and languages alone are not enough to solve all of our problems. In this chapter, we’re going to take a look at some design patterns that will prepare us for the kind of massive scale that cloud platforms facilitate.

We’ll explore the motivations behind and philosophies of Event Sourcing (ES) and Command Query Responsibility Segregation (CQRS), and then we’ll walk through some sample code that illustrates these design principles in action.

Introducing Event Sourcing

When we build software for a small scale, we tend to make a lot of assumptions. And when building microservices in a vacuum, especially if we’re following some classic “hello world"–style samples, we often do things in a way that might not be conducive to scale.

For example, our location service is synchronous. We submit a new location to it and it immediately writes that location to a database. When we want to know location history or the most recent location, we query the same service and it in turn queries the database. On the surface nothing seems all that bad about this design, until we ask ourselves how this might hold up against a million new location records per day for thousands or tens of thousands of team members. At this scale, these queries and new location submissions are going to be agonizingly slow, and we’ll quickly get bogged down waiting for the database.

This type of situation is what we call monolithic thinking. Even though we’re using microservices in the technical sense, we’re definitely not taking full advantage of the cloud, or of truly robust distributed computing design patterns. In short, we’re just making smaller monoliths, or, as some might call them, microliths.1

To help explain how Event Sourcing works, we’ll use an analogy: reality itself.

Reality Is Event Sourced

Our brains are essentially event-sourced systems. We receive stimuli in the form of the five senses, and our brains are then responsible for properly sequencing each stimulus (an event). Every few hundred milliseconds or so, they perform some calculations against this never-ending stream of stimuli. The result of these calculations is what we call reality.

Our minds process the incoming event stream and then compute state. This state is what we perceive as our reality; the world around us. When we watch someone dancing to music, we’re receiving audio and visual events, ensuring they’re in the proper order (our minds compensate for the fact that we process audio and visual stimuli at different speeds, giving us the illusion of synchronized stimuli).

Event-sourced applications operate in a similar manner. They consume streams of incoming events, perform functions against the inbound streams, and compute results or state in response. This is a very different model than microliths that just expose simple, synchronous query and store–type operations.

Event Sourcing Defined

There are a number of extremely good sources of information available on Event Sourcing. ES is not a brand new pattern. It is, however, gaining new traction as a viable way to deal with the types of elastic scaling and reliability that are required by cloud services.

The goal of this chapter isn’t to provide you with an in-depth doctoral thesis on the topic, but to give you enough of an overview so that the code we’re going to write will make sense, both from a technical and an architectural viewpoint.

In what we think of as traditional applications, state is managed as a discrete set of data. If a client makes a PUT or POST request to our service, the state is mutated. This gives us a good sense of how things are right now, but doesn’t give us any indication of how we got there. Also, remember that the concept of right now is an illusion, so attempting to bend reality to support this notion may be counterproductive.

Event Sourcing takes care of that problem, and much more, by separating the concern of state management from the concern of receiving stimuli that result in state changes. To make this happen, there are a number of requirements for an event-sourced system. It must be outlined in the following list:

Ordered

Event streams are ordered. Performing calculations against the same set of events but in a different sequence will produce different output. For this reason, ordering and proper time management are essential.

Idempotent

Any function that operates on an event stream must always return the exact same result for identical ordered event streams. This rule is absolutely mandatory, and failing to abide by it will cause untold levels of disaster.

Isolated

Any function that produces a result based on an event stream cannot make use of external information. All data required for calculations must be present in the events.

Past tense

Events take place in the past. This should be reflected in your variable names, structure names, and architecture. Event processors run calculations against a chronologically ordered sequence of events that have already happened.

Put mathematically, a function that operates on a stream of events will always produce the same state and output set of new events. For example:

f(event¹, event², ...) = state¹ + { output event set }

In keeping with the rules of Event Sourcing, this function, given the same inputs, will always produce the same outputs. This makes the business logic core of any Event Sourcing system eminently testable and reliable, whereas in most legacy codebases the business logic layer of the application is the least tested, darkest and scariest place.

As a corollary to this, you can add that given an existing state and an inbound event stream, an event processing function will always produce the same predictable state and set of output events:

f(state¹, event¹, event², ...) = state² + { output event set }

A few concrete examples might help further illustrate how the world looks when you see problems as event sourced. Let’s take a financial transaction processing system as a sample. We could have an inbound stream of transactions, the processing of which results in state changes such as modifications of account balances, credit limits, and so on. Such a transaction processing system might also emit new events to different streams as a result of processing, allowing partner systems to be notified and possibly triggering push notifications to customers with banking applications on their mobile devices.

The popular blockchain (e.g., Bitcoin) technology is based on the idea of secure and trusted sequences of events that occur on some owned resource.

Let’s take another of our most favorite problem domains: the Internet of Things (IoT). For the sake of illustration we can assume that we have an incoming stream of events from our smart devices containing data like GPS coordinates, weather statistics, and other sensor measurements. This event processor has two functions. First, it takes the latest recorded measurements, makes them available for a cache (this applies to CQRS, which we’ll get to shortly), and monitors the data in the stream for alert conditions. Then, when these conditions occur, it emits events so that other parts of the system can react accordingly.

Learning to Love Eventual Consistency

Shifting the paradigm to viewing the world as a series of streams upon which you place event processors and even more event emitters can be a shock to even the most seasoned developers.

In an event-sourced system, you don’t get to perform simple CRUD (Create, Read, Update, Delete) operations in a synchronous fashion against one or more services. There is no immediate feedback from the system of record that gives you the concrete state of how things exist in a consistent manner.

Instead, things in this new world are eventually consistent. You probably experience eventually consistent systems on a daily basis and never give them much thought because they are so commonplace.

Your banking system is eventually consistent: eventually the transaction where you just purchased that shiny new computer will show up in your bank account, eventually causing you pain... but until then you can enjoy that new laptop smell with no remorse.

Other eventually consistent applications with which we all interact daily are social networking apps. You’ve probably seen the scenario where a comment or post you make from one device takes a few minutes to show up in a friend’s browser or device. This is because the application architects have decided on a trade-off: by giving up the immediate consistency of synchronous operation in favor of a tolerable delay in feedback, the application is able to support enormous scale and traffic volume.

Learning to embrace and trust eventual consistency involves a thorough analysis of what information your users need and, more importantly, when they need it. It is up to you and your deep knowledge of your problem domain to decide on what information needs to be available immediately and which information can lag.

This leads us to our next pattern: CQRS.

The CQRS Pattern

If we follow some of the patterns we’ve been talking about to a logical conclusion, we will arrive at the need for the separation of command inputs from queries in our system, otherwise known as the Command Query Responsibility Segregation pattern.

The idea is a simple one, but like Event Sourcing, it often results in a fundamental shift in how we think about distributed systems. Commands are responsible for submitting inputs into our system, which will likely result in the creation of events distributed to one or more streams.

We’ve already decided that we’re going to sacrifice immediate consistency for scale, so we know that the act of submitting a command should be a fire-and-forget operation. The response from submitting a command is not the newly altered (consistent) state, it is merely an acknowledgment of whether or not the command was successfully ingested by the system.

Eventually, the state of the system will be altered to reflect the processing of this one command. The size of this time lapse depends entirely on the business process being performed and the criticality of the propagation of the data change.

The other half of this new segmentation of responsibilities is the query. As a result of embracing eventual consistency, we’ve already done an in-depth analysis of the information our clients need.

Since we know what queries are going to be made of our system, we can predict those queries and, in many cases, make the data available before the client queries for it.

This is another fundamental shift in thinking. Traditional backend monolithic applications involve hitting a query endpoint with some parameters. Those parameters are then used to perform some amount of lengthy processing and querying, returning calculated results.

In the world of massive scale, volume, and throughput we simply can’t afford to tie up the resources of our microservices by making computationally expensive queries. We’re no longer going to tolerate sitting around twiddling our thumbs while we wait for our filter and grouping clauses to sift through millions of rows of data that might be incurring row- or table-level locks in a database.

The idea is to front-run the expected usage of the system so that the data is made available as close to the consumer as possible, and in a way that is queryable as fast as possible, involving the smallest amount of computational processing as possible. In short, we want the queries to be as dumb as possible.

Let’s use another example to illustrate this pattern in action. Imagine that we’re writing some facilities management software for apartment buildings. Tenants will be accessing a portal that allows for a display of electrical usage. Depending on who logs in, we’ll be able to see monthly usage values by apartment, by building, by region, etc.

We’ve got an event stream of events from electrical usage monitoring devices. Each unit might contribute one usage event every hour (since kWh is an accepted standard for metered electricity usage). We could build this system such that every time someone refreshes their portal page, we go out to some data service and request a roll-up of all the meter events within some time frame, but that is just unacceptable for modern software development at cloud scale.

If we’re pushing these calculations off to a database, then our database immediately becomes a critical point of failure and will gum up our otherwise smooth machinery.

Knowing the usage pattern of the majority of our customers gives us the ability to take advantage of Event Sourcing and build a proper CQRS implementation. Our event processor can recompute cached meter aggregates every time it receives a new event. With this in place, we’ll have the results portal users are expecting already sitting in a database or cache when the query happens. No complex calculations, no ad hoc aggregates and complicated roll-ups... just a simple query.

The event store (persistent storage of all meter events received since the system started) is still available if we need more complex calculations or auditing, but the eventually consistent state (aka reality) is made available for immediate, super-fast query to all consumers.

Event Sourcing and CQRS in Action—Team Proximity Sample

Up to this point in the book all of our samples have been fairly simple. We’ve dealt with simple services that perform simple tasks. They are small and we can deploy them to the cloud, and we can even scale them up and down so we can handle larger volumes.

But this type of architecture only gets us so far. For the rest of this chapter we’re going to expand the scope of our team management application so that we can illustrate the power (and potential downfalls) of Event Sourcing and CQRS applied to a real-world problem.

The problem with applying buzzword-ridden patterns to our problems is that the application of these patterns is often done at too high a level. Most of us have fallen into the trap of doing some reading, finding a fancy new pattern, and then slathering it on top of an existing solution without doing much analysis. This is the classic “I have a hammer and everything looks like nails” fallacy.

In these situations, we tend to apply shiny new patterns like condiments. We sprinkle them on top of our legacy applications and hope they will run better, respond faster, and scale more. The problem is patterns like the ones we’re discussing in this book aren’t toppings you shake onto existing code; they require a fundamental change in the recipe. For many organizations that have built up years of process around the creation of hard-to-scale monoliths, implementing them might also require building completely new kitchens.

Event Sourcing Is Not a Panacea

While we’re devoting a lot of time to discussing ES, CQRS, and eventual consistency, these are patterns that need to be applied when the problem domain requires it. These patterns, like all patterns, are just some possible solutions to a problem. Assuming that you can fix all problems with Event Sourcing is as dangerous as assuming you can solve all real-world problems with a single hammer.

The samples for our existing team and location services are rudimentary. They let us update and query team membership as well as member locations. But let’s say our application now needs to manage a vast number of teams, each containing hundreds of individuals. Each member of a team is wielding a mobile device with an application that routinely reports the location of that member.

While having near-real-time location data on all of the people using our application is a great feature on its own, the real power comes from what we can do by processing incoming events. In our case, we want to detect when two team members are close to each other.

In the new sample we’re going to build, we will be detecting when member locations occur within some small distance of each other. The system will then support reacting to these proximity detections. For example, we might want to send push notifications to the mobile devices of the nearby team members to alert them to the possibility for catching up in person.

To do this properly, we’re going to embrace Event Sourcing and CQRS, and we’ll be splitting up the responsibilities of the system among four components, as follows:

  • The location reporter service (Command)
  • The event processor (Event Sourcing)
  • The reality service (Query)
  • The proximity monitor (Event Sourcing)

We will discuss the purpose and implementation of each of these services in detail throughout the rest of the chapter.

The Location Reporter Service

In a CQRS system, the inputs and outputs of the system are decoupled entirely. For our sample, the inputs take the form of commands sent to the location reporter service.

The client applications (mobile, web, IoT, etc.) in our system need to submit new location data on members on a regular basis. They will do so by sending updates to the location reporter.

You can find the full source code for the location reporter on GitHub.

Since we build everything API First, let’s take a look at the extremely simple API for the location reporter, shown in Table 6-1.

Table 6-1. Location reporter service API
Resource Method Description
/api/members/{memberId}/locationreports POST Submits a new location report

When we get a new location report, we’ll perform the following tasks:

  1. Validate the report object.
  2. Convert the command into an event. 
  3. Emit the event on a message queue.

Recall from the discussion about the requirements of an event sourcing system that event processing cannot make use of information that exists outside the event stream.

This sample is designed to detect nearby teammates. This should bring up an important question: how do we know the team membership of the member referenced by a location report?

We could include it in the location report, but that would burden the client with maintaining information that isn’t really part of the client’s responsibility or domain. Let’s say we have a simple IoT device designed to emit GPS coordinates every 30 seconds. Should this physical device then also be required to periodically query a service to discover team membership?

You’ll often hear this problem referred to as a complexity leak. The internal workings (or limitations) of our system could leak out of our service and force our clients to bear the burden of additional complexity. There is also an important Conway’s law potential for failure here. If the team responsible for the service and the consuming client are isolated, then it becomes almost too easy for the service team to foist the complexity on the client and not give the problem the thought and diligence it requires.

So, if the event processor (we’ll discuss that next) can’t query for team membership while processing the event stream because of the core rules of Event Sourcing, and the client/consumers shouldn’t bear the burden of maintaining team membership, what do we do?

As you work with reactive, distributed systems more and more, you will see this pattern emerge consistently. Solving this particular problem—gathering all the necessary information to produce an event—should be the responsibility of the command processor, the thing that turns commands into events.

In our case, the command processor needs to create an event with an appropriate timestamp, and it’s also going to need to fetch the team membership (which is a volatile quantity, subject to change at any time) to place that information on the event.

This has the desirable effect of allowing our system to detect nearby team members only if they were on the same team at the time the events occurred. Other solutions to this problem that might not utilize Event Sourcing could produce “false positive” proximity alerts for team members based on stale caches, out-of-order message processing, client synchronization issues, etc.

While the consequences of a false positive for a harmless alert about a nearby teammate might be low, think about if this application served a different business domain. What if this was a financial application processing an event stream of transactions, or a security system granting or denying physical access? In these cases, the consequences of a false positive based on out-of-stream data could be disastrous.

Creating the location reports controller

Now that we know what we’re building and why, let’s take a look at the simple controller that handles our single-method API (Example 6-1).

Example 6-1. LocationReportsController.cs
using System;
using Microsoft.AspNetCore.Mvc;
using StatlerWaldorfCorp.LocationReporter.Events;
using StatlerWaldorfCorp.LocationReporter.Models;
using StatlerWaldorfCorp.LocationReporter.Services;

namespace StatlerWaldorfCorp.LocationReporter.Controllers
{
[Route("/api/members/{memberId}/locationreports")]
public class LocationReportsController : Controller
{
    private ICommandEventConverter converter;
    private IEventEmitter eventEmitter;
    private ITeamServiceClient teamServiceClient;
    

    public LocationReportsController(
        ICommandEventConverter converter, 
        IEventEmitter eventEmitter, 
        ITeamServiceClient teamServiceClient) {

        this.converter = converter;
        this.eventEmitter = eventEmitter;
        this.teamServiceClient = teamServiceClient;
    }

    [HttpPost]
    public ActionResult PostLocationReport(Guid memberId, 
      [FromBody]LocationReport locationReport)
    {
      MemberLocationRecordedEvent locationRecordedEvent = 
        converter.CommandToEvent(locationReport);
      locationRecordedEvent.TeamID = 
        teamServiceClient.GetTeamForMember(
           locationReport.MemberID);
      eventEmitter.EmitLocationRecordedEvent(
         locationRecordedEvent);

      return this.Created(
        $"/api/members/{memberId}/locationreports/
        {locationReport.ReportID}", 
        locationReport);
    }
}
}

The controller is really just responsible for handling the incoming JSON payload, delegating the work, and replying with an appropriate JSON response. As you can see in the code, we’ve made a couple of utilities available for injection at runtime and during testing, like the ICommandEventConverter, the IEventEmitter, and the ITeamServiceClient.

While this pattern may not have been commonplace in the past with legacy ASP.NET applications, you will see this all over modern ASP.NET (especially ASP.NET Core) code. We inject the objects to which we will delegate and we leave our controller methods as simple and small as possible. This makes our controllers and our utilities far easier to test and maintain.

The command converter creates a basic event from an input command while the team service client allows us to fetch the ID of the team to which the member belongs (our system only allows people to belong to one team at a time). Finally, the event emitter is responsible for sending the event to the right place.

Because all of these things are injectable via DI and available for constructor injection during unit tests, we can very easily make our code simple, readable, and easy to maintain.

Building an AMQP event emitter

The location reporter service is actually pretty small, and other than the controller, the most interesting stuff is in the event emitter. Our sample emits events to an Advanced Message Queuing Protocol (AMQP) queue supported by RabbitMQ. Take a look at the code for our AMQP event emitter (Example 6-2).

Example 6-2. AMQPEventEmitter.cs
using System;
using System.Linq;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using StatlerWaldorfCorp.LocationReporter.Models;

namespace StatlerWaldorfCorp.LocationReporter.Events
{      
    public class AMQPEventEmitter : IEventEmitter
    {
        private readonly ILogger logger;
        private AMQPOptions rabbitOptions;
        private ConnectionFactory connectionFactory;

        public AMQPEventEmitter(ILogger<AMQPEventEmitter> logger,
            IOptions<AMQPOptions> amqpOptions)
        {
            this.logger = logger;
            this.rabbitOptions = amqpOptions.Value;

            connectionFactory = new ConnectionFactory();
            
            connectionFactory.UserName = rabbitOptions.Username;
            connectionFactory.Password = rabbitOptions.Password;
            connectionFactory.VirtualHost = 
              rabbitOptions.VirtualHost;
            connectionFactory.HostName = rabbitOptions.HostName;
            connectionFactory.Uri = rabbitOptions.Uri;
            
            logger.LogInformation(
              "AMQP Event Emitter configured with URI {0}", 
               rabbitOptions.Uri);
        }
        public const string QUEUE_LOCATIONRECORDED = 
          "memberlocationrecorded";

        public void EmitLocationRecordedEvent(
          MemberLocationRecordedEvent locationRecordedEvent)
        {                    
            using (IConnection conn = connectionFactory.
               CreateConnection()) {
                using (IModel channel = conn.CreateModel()) {
                    channel.QueueDeclare(
                        queue: QUEUE_LOCATIONRECORDED,
                        durable: false,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null
                    );
                    string jsonPayload = 
                      locationRecordedEvent.toJson();
                    var body = 
                       Encoding.UTF8.GetBytes(jsonPayload);
                    channel.BasicPublish(
                        exchange: "",
                        routingKey: QUEUE_LOCATIONRECORDED,
                        basicProperties: null,
                        body: body
                    );
                }
            }
        }
    }
}

As with our controller, we’re injecting the supporting classes we need by interface as parameters to our constructor. None of this would work properly if we didn’t configure dependency injection in our startup class.

Configuring and starting the service

The AMQP event emitter class gets the information needed to configure a RabbitMQ connection factory from an options instance. You can see how these options are configured by looking at the location reporter’s Startup class (Example 6-3).

Example 6-3. src/StatlerWaldorfCorp.LocationReporter/Startup.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
using Microsoft.Extensions.Logging;
using System.Linq;
using StatlerWaldorfCorp.LocationReporter.Models;
using StatlerWaldorfCorp.LocationReporter.Events;
using StatlerWaldorfCorp.LocationReporter.Services;

namespace StatlerWaldorfCorp.LocationReporter
{
    public class Startup
    {        
        public Startup(IHostingEnvironment env, 
          ILoggerFactory loggerFactory) 
        {
            loggerFactory.AddConsole();
            loggerFactory.AddDebug();
            
            var builder = new ConfigurationBuilder()                
                .SetBasePath(env.ContentRootPath)
                .AddJsonFile("appsettings.json", 
                   optional: false, reloadOnChange: false)
                .AddEnvironmentVariables();                         

            Configuration = builder.Build();                    
        }

        public IConfigurationRoot Configuration { get; }

        public void ConfigureServices(IServiceCollection services) 
        {
            services.AddMvc();
            services.AddOptions();

            services
              .Configure<AMQPOptions>(
                 Configuration.GetSection("amqp"));            
            services
              .Configure<TeamServiceOptions>(
                 Configuration.GetSection("teamservice"));

            services.AddSingleton(typeof(IEventEmitter), 
              typeof(AMQPEventEmitter));
            services.AddSingleton(typeof(ICommandEventConverter),
              typeof(CommandEventConverter));
            services.AddSingleton(typeof(ITeamServiceClient), 
              typeof(HttpTeamServiceClient));
        }

        public void Configure(IApplicationBuilder app, 
                IHostingEnvironment env, 
                ILoggerFactory loggerFactory,
                ITeamServiceClient teamServiceClient,
                IEventEmitter eventEmitter) 
        {           
            // Asked for instances of singletons during startup
            // to force initialization early.
            
            app.UseMvc();
        }
    }
}

The most important lines of code are in bold. The first two calls to Configure tell the configuration subsystem that it should make options instances available for dependency injection based on the amqp and teamservice sections, respectively.

Remember that these sections can be supplied by an appsettings.json file but can also be overridden by environment variables. This environment variable overriding is what we would do in a production environment to point the app at the right Rabbit server and team service URL.

You may also notice that we’re reading in an appsettings.json file. This file contains a default set of values to configure our RabbitMQ service as well as the URL to the team service for our queries. It’s important to remember that the order of precedence is defined by the order in which you add configuration sources, so make sure that you always add your local/default JSON settings first so they can be overridden.

Here’s what our appsettings.json file looks like:

{
  "amqp": {
    "username": "guest",
    "password": "guest",
    "hostname": "localhost",
    "uri": "amqp://localhost:5672/",
    "virtualhost": "/"
  },
  "teamservice": {
    "url": "http://localhost:5001"
  }
}

Consuming the team service

Before we get to running the location reporter, let’s take a look at the HTTP implementation of the ITeamServiceClient (Example 6-4). Note that we’re getting the URL of the team service from injected configuration options, the same way we configured our Rabbit client.

Example 6-4. HttpTeamServiceClient.cs
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using Newtonsoft.Json;
using StatlerWaldorfCorp.LocationReporter.Models;

namespace StatlerWaldorfCorp.LocationReporter.Services
{
    public class HttpTeamServiceClient : ITeamServiceClient
    {        
        private readonly ILogger logger;

        private HttpClient httpClient;

        public HttpTeamServiceClient(
            IOptions<TeamServiceOptions> serviceOptions,
            ILogger<HttpTeamServiceClient> logger)
        {
            this.logger = logger;
               
            var url = serviceOptions.Value.Url;

            logger.LogInformation(
              "Team Service HTTP client using URL {0}", 
              url);

            httpClient = new HttpClient();
            httpClient.BaseAddress = new Uri(url);
        }
        public Guid GetTeamForMember(Guid memberId)
        {                            
            httpClient.DefaultRequestHeaders.Accept.Clear();
            httpClient.DefaultRequestHeaders.Accept.Add(
              new MediaTypeWithQualityHeaderValue(
              "application/json"));

            HttpResponseMessage response = 
              httpClient.GetAsync(
              String.Format("/members/{0}/team",
                memberId)).Result;

            TeamIDResponse teamIdResponse;
            if (response.IsSuccessStatusCode) {
                string json = response.Content
                  .ReadAsStringAsync().Result;
                teamIdResponse = 
                  JsonConvert.DeserializeObject<TeamIDResponse>(
                     json);
                return teamIdResponse.TeamID;
            }
            else {
                return Guid.Empty;
            }            
        }
    }

    public class TeamIDResponse
    {
        public Guid TeamID { get; set; }
    }
}

In this example we’re using the .Result property to force a thread to block while we wait for a reply from the asynchronous method. For production-quality code, we would probably refactor this and ensure that we’re carrying asynchronous results all the way to the service boundary.

The code in bold shows the most important piece of this client: we’re asking the team service to tell us the team membership of a member. This REST resource wasn’t part of the service when we designed it earlier; it was added later to support the functionality for this chapter.

To see the location reporter in action, we first need to set up a local copy of RabbitMQ. We could also just go straight to writing integration tests and rely on the cloud-based Wercker builds to fire up the RabbitMQ testing instance, but I like being able to play with things locally first to get a feel for how everything works.

If you’re on a Mac, it should be easy enough to either install RabbitMQ or just start up a Docker image running Rabbit with the management console plug-in enabled (make sure to map both the management console port and the regular port). On Windows, it’s probably easiest to just install RabbitMQ locally. For details on how to install or run Rabbit, check out the documentation.

Running the location reporter service

With that running, and our defaults set up to point to a local Rabbit instance, we can fire up the location reporter service as follows (make sure you’re in the src/StatlerWaldorfCorp.LocationReporter subdirectory):

$ dotnet restore
...
$ dotnet build
...
$ dotnet run --server.urls=http://0.0.0.0:9090
...

Depending on your setup, you might not need to change the default port. With the service running, we just need to submit a request to the service. One of the easiest ways to do that is to install the Postman plug-in for Chrome, or we can use curl to submit a JSON payload like this one:

$ curl -X POST -d 
 '{"reportID": "...", 
   "origin": "...", "latitude": 10, "longitude": 20, 
   "memberID": "..."}' 
 http://...1e2 
/locationreports

When we submit this, we should get an HTTP 201 reply from the service, with the Location header set to something that looks like /api/members/4da420c6-fa0f-4754-9643-8302401821e2/locationreports/f74be394-0d03-4a2f-bb55-7ce680f31d7e. If everything else is working properly, we should be able to use our RabbitMQ management console to see that there’s a new message sitting in the memberlocationrecorded queue, as shown in Figure 6-1.

Figure 6-1. A new message in the queue

And if we use this same management console to examine the contents of the message, we should see that it is a faithful JSON conversion of the event we created, including the augmentations of the timestamp and the team membership of the member, as shown in Figure 6-2.

Figure 6-2. Getting a message from the queue

The Event Processor

The main purpose of the sample we’re building in this chapter is to detect team members within some range of each other. The bulk of this work is done by the event processor. The event processor is the part of the system that is as close to a pure function as we can get.

It is responsible for consuming events from the stream and taking the appropriate actions. These actions could include emitting new events on new event streams or pushing state changes to the reality service (discussed next).

While there are many important pieces to the event processor, the core of it is the ability to detect nearby teammates. To perform that detection, we need to know how to compute the distance between their GPS coordinates.

Geolocation Calculations

We actually don’t need to know how to compute the distance between GPS coordinates. Redis comes with a special type of list that stores GPS coordinates, and you can use commands like GEORADIUS and GEODIST to detect list members within a given radius and determine the distance between members.

In order to illustrate the role of an event processor, we’re doing this calculation in the C# code, but in a production scenario we might defer this to Redis. Its geohashing calculations can detect nearby teammates far faster than we can do this in C#.

If you want to implement this for yourself as a fun experiment, you might want to try storing member locations in sets that correspond to their team membership; that way, querying GEORADIUS on a team’s location set has the desired effect.

Rather than showing you the details of the math involved, Example 6-5 shows the unit test that proves the math we borrowed from the smart people on the internet works (if the Earth was flat, this math would be much easier!).

Example 6-5. GPS utility unit test
[Fact]
public void ProducesAccurateDistanceMeasurements()
{
  GpsUtility gpsUtility = new GpsUtility();

  GpsCoordinate losAngeles = new GpsCoordinate() {
      Latitude = 34.0522222,
      Longitude = -118.2427778
  };

  GpsCoordinate newYorkCity = new GpsCoordinate() {
      Latitude = 40.7141667,
      Longitude = -74.0063889
  };

  double distance = 
      gpsUtility.DistanceBetweenPoints(losAngeles, newYorkCity);
  Assert.Equal(3933, Math.Round(distance)); // 3,933 km
  Assert.Equal(0, 
      gpsUtility.DistanceBetweenPoints(losAngeles, losAngeles));            
}

In order to keep the code clean and testable, we want to separate the responsibilities of event processing into the following:

  • Subscribing to a queue and obtaining new messages from the event stream
  • Writing messages to the event store
  • Processing the event stream (detecting proximity)
  • Emitting messages to a queue as a result of stream processing
  • Submitting state changes to the reality server/cache as a result of stream processing

As with all of this book’s samples, you can find the full code for this on GitHub. To save your eyes the agony of scanning through a dozen pages of code listings, I’ll try and limit the listings to the most important pieces.

To detect proximity events, I’ve written a proximity detector that makes use of the GPS utility class (Example 6-6). It takes as input the event pulled from the stream, a list of teammates and their locations, and a radius threshold.

Example 6-6. ProximityDetector.cs
using System.Collections.Generic;
using StatlerWaldorfCorp.EventProcessor.Location;
using System.Linq;
using System;

namespace StatlerWaldorfCorp.EventProcessor.Events
{
    public class ProximityDetector
    {
      public ICollection<ProximityDetectedEvent> 
          DetectProximityEvents(
             MemberLocationRecordedEvent memberLocationEvent,
             ICollection<MemberLocation> memberLocations,
             double distanceThreshold)
      {
          GpsUtility gpsUtility = new GpsUtility();
          GpsCoordinate sourceCoordinate = new GpsCoordinate() {
              Latitude = memberLocationEvent.Latitude,
              Longitude = memberLocationEvent.Longitude
          };
       
          return memberLocations.Where( 
            ml => ml.MemberID != memberLocationEvent.MemberID &&                     
                  gpsUtility.DistanceBetweenPoints(
                     sourceCoordinate, ml.Location) < 
                     distanceThreshold)            
            .Select( ml => {
              return new ProximityDetectedEvent() {
                SourceMemberID = memberLocationEvent.MemberID,
                TargetMemberID = ml.MemberID,
                DetectionTime = DateTime.UtcNow.Ticks,
                SourceMemberLocation = sourceCoordinate,
                TargetMemberLocation = ml.Location,
                MemberDistance = 
                  gpsUtility.DistanceBetweenPoints(
                     sourceCoordinate, ml.Location)
                };
            }).ToList();                            
        }
    }
}

We can then take the results of this method and use them to create the appropriate side effects, including the optional dispatch of a ProximityDetectedEvent and the writing of an event to the event store.

In all of our code, we are embracing the principles behind clean object-oriented design and injecting dependencies into our classes by interface wherever possible. This makes the code readable, easier to maintain, and easier to test.

Case in point: the high-level code responsible for responding to an incoming message, detecting proximity events, and emitting proximity events and updating the reality cache is written so all of the real work is delegated to smaller classes that embody the Single Responsibility Principle.

Example 6-7 shows the code for our main event processor.

Example 6-7. Events/MemberLocationEventProcessor.cs
using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using StatlerWaldorfCorp.EventProcessor.Location;
using StatlerWaldorfCorp.EventProcessor.Queues;

namespace StatlerWaldorfCorp.EventProcessor.Events
{
public class MemberLocationEventProcessor : IEventProcessor
{
    private ILogger logger;
    private IEventSubscriber subscriber;
    private IEventEmitter eventEmitter;
    private ProximityDetector proximityDetector;
    private ILocationCache locationCache;

    public MemberLocationEventProcessor(
        ILogger<MemberLocationEventProcessor> logger,
        IEventSubscriber eventSubscriber,
        IEventEmitter eventEmitter,
        ILocationCache locationCache)
    {
        this.logger = logger;
        this.subscriber = eventSubscriber;
        this.eventEmitter = eventEmitter;
        this.proximityDetector = new ProximityDetector();
        this.locationCache = locationCache;

        this.subscriber.
          MemberLocationRecordedEventReceived += (mlre) => {
            var memberLocations = 
               locationCache.GetMemberLocations(mlre.TeamID);
            ICollection<ProximityDetectedEvent> proximityEvents = 
                proximityDetector.DetectProximityEvents(mlre, 
                memberLocations, 30.0f);
            foreach (var proximityEvent in proximityEvents) {
                eventEmitter.
                  EmitProximityDetectedEvent(proximityEvent);
            }

            locationCache.Put(mlre.TeamID, 
              new MemberLocation { 
                 MemberID = mlre.MemberID, 
                 Location = new GpsCoordinate {
                   Latitude = mlre.Latitude, 
                   Longitude = mlre.Longitude
                 }
             });
        };
    }       

    public void Start()
    {
        this.subscriber.Subscribe();
    }

    public void Stop()
    {
        this.subscriber.Unsubscribe();
    }
}
}

The dependencies of this class are not only evident, but made mandatory through the use of the constructor parameters. They are:

  • An instance of a logger appropriate for this class.
  • An event subscriber (responsible for telling the processor when new MemberLocationRecordedEvents arrive).
  • An event emitter, allowing the processor to emit ProximityDetectedEvents.
  • A location cache, allowing us to quickly store and retrieve current locations of team members as discovered by the event processor. Depending on how you design your “reality” service, this cache can be shared by the reality service or a duplication of it.

The only other responsibility of the event processing service is that it should store every event it receives in the event store. There are a number of reasons for this, including providing a history for other services to search. The event store can also be used to reseed the reality cache if the cache crashes and loses data.

If you’re feeling adventurous, you can look at the code created so far and follow the patterns used to add an event store interface to the MemberLocationEventProcessor class, making sure it’s unit tested and the integration test verifies that events are being recorded.

Caches Are Only Conveniences

Remember that caches serve the architectural role of a convenience, and you should never have any data in a cache that you can’t reconstitute from somewhere else. If your code encounters a cache miss, it should know how to go calculate what should have been in the cache and update the cache.

If your code won’t work unless it gets a cache hit, then you might need to reevaluate your architecture or choose a different tool, like a full database for long-term persistence.

Since we’ve already covered how to build Entity Framework repositories in the book, I’ll leave that code listing for you to check out on GitHub if you’re curious.

The Redis location cache

The location cache interface has the following methods defined on it:

  • GetMemberLocations(Guid teamId)
  • Put(Guid teamId, MemberLocation location)

For our implementation of this cache I decided upon Redis, for a number of reasons. First and foremost, it’s a very easy to use distributed cache. It’s also incredibly powerful, and has very wide adoption and a thriving open source community around it. Finally, it is usually available in some form as a cloud-hosted solution, making it ideal for a backing service for our reality cache.

Redis is also quite a bit more than just a cache, and it includes a number of features that could dramatically improve the samples for this chapter that are out of scope of this book and better left to a book on Redis.

We’re creating a Redis hash for each of the teams in our service. The JSON payload for a serialized member location is then added as a field (keyed on member ID) to this hash. This makes it easy to update multiple member locations simultaneously without creating a data-overwrite situation and makes it easy to query the list of locations for any given team, since the team is a hash.

Take a look at the following redis-cli session that was taken just moments after running some integration tests against a local instance on one of my development workstations:

127.0.0.1:6379> KEYS *
 1) "0145c13c-0dde-446c-ae8b-405a5fc33c76"
 2) "d23d529f-0c1e-470f-a316-403b934b98e9"
 3) "58265141-1859-41ef-8dfc-70b1e65e7d83"
 4) "26908092-cf9a-4c4f-b667-5086874c6b61"
 5) "679c3fdb-e673-4e9d-96dd-9a8388c76cc5"
 6) "f5cb73c5-f87c-4b97-b4e6-5319dc4db491"
 7) "56195441-168d-4b19-a110-1984f729596e"
 8) "49284102-36fd-49e6-a5fa-f622ee3708f1"
 9) "a4f4253b-df79-4f79-9eff-5d34a759f914"
10) "d13a6760-8043-408d-9a05-dd220988a655"
127.0.0.1:6379> HGETALL 0145c13c-0dde-446c-ae8b-405a5fc33c76
1) "7284050e-f320-40a5-b739-6a1ab4045768"
2) "{"MemberID":"7284050e-f320-40a5-b739-6a1ab4045768",
  "Location":{"Latitude":40.7141667,"Longitude":-74.0063889}}"
3) "2cde3be8-113f-4088-b2ba-5c5fc3ebada8"
4) "{"MemberID":"2cde3be8-113f-4088-b2ba-5c5fc3ebada8",
  "Location":{"Latitude":40.7282,"Longitude":-73.7949}}"

There are 10 hash keys displayed. Each of these hash keys is a team that has received at least one member location recorded event. Using the HGETALL command, we can get a list of all of the member location objects for that team.

For the full source code of the integration test that produced this data, take a look at the GitHub repository

The Reality Service

Reality is subjective, and, as we discussed earlier, even reality as you perceive it in your mind is an approximation and actually occurs slightly in the past. In an effort to name our components in a way that respects this truth and the concept of eventual consistency, we’ve decided to call this service the reality service.

If we called it a state service or something else that implied that you could query it at any time and get a live, real-time, exact set of information that describes the state of the entire system at that moment, we would be misleading our consumers and the developers.

The reality service is responsible for maintaining the location of each team member, but that location will only be the most recently received location from some application. We will never know exactly where someone is; we can only tell where they were when they last submitted a command that produced a successfully processed event.

Again, this reinforces the notion that reality is really a function of stimuli received in the past.

Let’s take a look at the API we want to expose from the reality service (Table 6-2).

Table 6-2. Reality service API
Resource Method Description
/api/reality/members GET Retrieves the last known location of all members that are known to the reality service
/api/reality/members/{memberId} GET Retrieves the last known location of a single member
/api/reality/members/{memberId} PUT Sets the last known location of a member

There are two important things to remember about a reality service like this:

Reality is not the event store.

Reality is merely a representation of the state you expect your consumers to need, a prebuilt set of data designed to support the query operations in a CQRS pattern.

Reality is disposable.

The reality cache that supports the query operations of the system is disposable. We should be able to destroy all instances of reality and reconstitute them simply by running our event processing algorithm against either an entire event stream, or the events occurring since the last snapshot.

The code for the reality service is made up of things we’ve covered already in this book:

  • Basic microservice scaffolding (middleware, routing, configuration, bootstrapped web server)
  • Reliance upon dependency injection to provide configuration options and implementation instances
  • A class that talks to the Redis cache to query the current locations
  • A consumer of the team service to query the list of teams

I’m not going to cover the specific code for the reality service because, as I said, everything in it is something that has been done elsewhere in the book. If you would like to build your own reality service as a reader exercise, I strongly encourage you to do so as this will help build your muscle memory for building out services from scratch in ASP.NET Core.

The Proximity Monitor

The output of the event processor is a stream of proximity detected events. In a real-world, production system, we would have some kind of application or service sitting on the end of this stream.

It would await the arrival of these events and then notify appropriate downstream components that the events have occurred. This could be a notification to a website to let a single-page app update its UI, or it could be a notification that gets sent to the mobile devices of both the source and target team members that are part of the event.

The code for a proximity monitor would include:

  • Basic microservice scaffolding (this should be old hat to you by now)
  • A queue consumer subscribed to the arrival of ProximityDetectedEvent messages
  • Consumption of some third-party or cloud provider to deal with push notifications

Chapter 11 covers real-time applications, where we’ll talk about some options for publishing and reacting to push notifications and integrating client-side applications with server applications in real time. We don’t need to go into the code for a real proximity monitor for this chapter.

Running the Samples

There are a number of ways you can run the sample services created in this chapter to exercise everything you’ve learned so far. The easiest is to set it all up on your development workstation using locally installed services.

The following are the prerequisites for running the samples in this chapter:

A RabbitMQ server

You can install this locally on your machine, you can run a copy of the Docker image available on docker hub (ensuring you bind the right ports), or you can point to a cloud-hosted RabbitMQ server.

A Redis server

As with Rabbit, you can install this locally, run the Docker image, or point to a cloud-hosted Redis server.

The appsettings.json files for the services are checked into GitHub such that the default operating mode is to assume the prerequisites are running locally either through direct install or through ports exposed and mapped from running Docker images.

Refer to the instructions on the appropriate websites for either installing the servers or running the docker hub images. You do not need to do any configuration or setup beyond the defaults—the services all create their own hashes and queues.

Starting the Services

Once you’ve got your prerequisites up and running, check out the code for the services es-locationreporter and es-eventprocessor from GitHub. You’ll also need to grab a copy of teamservice. Make sure you grab the master branch since you just want an in-memory repository for testing (the location branch requires a Postgres database).

As per usual procedure, make sure you do a dotnet restore and a dotnet build on the main service application for each of them from inside their respective src/<project> directories.

To start the team service, issue the following command in a terminal from the src/StatlerWaldorfCorp.TeamService directory:

$ dotnet run --server.urls=http://0.0.0.0:5001
Hosting environment: Production
Content root path: (...)
Now listening on: http://0.0.0.0:5001
Application started. Press Ctrl+C to shut down.

To start the location reporter, issue the following command at your terminal from the src/StatlerWaldorfCorp.LocationReporter directory:

$ dotnet run --server.urls=http://0.0.0.0:5002
info: StatlerWaldorfCorp.LocationReporter.Services
.HttpTeamServiceClient[0]
      Team Service HTTP client using URL http://localhost:5001
info: StatlerWaldorfCorp.LocationReporter.Events.AMQPEventEmitter[0]
      AMQP Event Emitter configured with URI amqp://localhost:5672/
Hosting environment: Production
Content root path: (...)
Now listening on: http://0.0.0.0:5002
Application started. Press Ctrl+C to shut down.

Note that it defaults to looking for the team service on port 5001. Because we’re going to be running both microservices and both are ASP.NET services (even though the event processor just listens on queues), we need to make sure they don’t try and grab the same server port.

Now start the event processor (from the src/StatlerWaldorfCorp.EventProcessor directory):

$ dotnet run --server.urls=http://0.0.0.0:5003
info: StatlerWaldorfCorp.EventProcessor.Queues.AMQP
.AMQPConnectionFactory[0]
      AMQP Connection configured for URI : amqp://localhost:5672/
info: StatlerWaldorfCorp.EventProcessor.Queues.AMQP
.AMQPEventSubscriber[0]
      Initialized event subscriber for queue memberlocationrecorded
info: StatlerWaldorfCorp.EventProcessor.Queues.AMQP
.AMQPConnectionFactory[0]
      AMQP Connection configured for URI : amqp://localhost:5672/
info: StatlerWaldorfCorp.EventProcessor.Queues.AMQP
.AMQPEventEmitter[0]
      Emitting events on queue proximitydetected
info: StatlerWaldorfCorp.EventProcessor.Location.Redis
.RedisLocationCache[0]
      Using redis location cache - 127.0.0.1:6379,
allowAdmin=False,ssl=False,abortConnect=True,resolveDns=False
info: StatlerWaldorfCorp.EventProcessor.Queues.AMQP
.AMQPEventSubscriber[0]
      Subscribed to queue.
Hosting environment: Production
Content root path: (...)
Now listening on: http://0.0.0.0:5003
Application started. Press Ctrl+C to shut down.

The event processor has a number of dependencies, and you’ll see a bunch of diagnostic information during startup that lets you know where it is attempting to find those dependencies.

At this point you should have the microservices and servers listed in Table 6-3 running (the italicized servers are third-party apps not written in this book).

Table 6-3. Event Sourcing sample processes
Service Docker image Port
RabbitMQ rabbitmq:3.6.6 5672
Redis Cache redis:3.2.6 6379
Team service dotnetcoreservices/teamservice 5001
Location reporter dotnetcoreservices/locationreporter 5002
Event processor dotnetcoreservices/es-eventprocessor 5003
Reality service (optional) dotnetcoreservices/es-reality 5004

If your workstation is anything like mine, this kind of workload running on a laptop’s small memory footprint can grind it to a halt. If you find yourself in a crunch for resources, you should try using dotnet run for all of the .NET Core services built for this book and just leaving Redis and RabbitMQ for Docker.

Submitting Sample Data

First of all, if you have made it through to this point in the chapter, then congratulations and thank you for sticking with it! There is a lot of material in this chapter—a lot of code, and a ton of concepts that might have been new to you.

The reward now comes at the hand of your favorite REST client. All the samples in the book were tested with the Postman plug-in for Chrome, but you can use the curl command-line application or any other tool for sending custom HTTP payloads to services.

Use the following steps to exercise the entire Event Sourcing/CQRS system from end to end:

  1. Issue a POST to http://localhost:5001/teams to create a new team. Refer to the source code in earlier chapters for the format, but the fields in the JSON that you’ll need are id and name. Make sure you keep the GUID for the newly created team handy.
  2. Issue a POST to http://localhost:5001/teams/<new guid>/members to add a member to the team. Make sure you keep the GUID for the new member handy.
  3. Issue a POST to http://localhost:5002/api/members/<member guid>/locationreports. A location report requires the following fields: ReportID, Latitude, Longitude, Origin, ReportID, and MemberID.
  4. Watch the location report being converted to a MemberLocationReportedEvent and placed on the appropriate queue (the default is memberlocationrecorded). If you need some reference coordinates for latitude and longitude, you can find several of them in the GpsUtilityTest class in the event processor unit test project.
  5. Repeat step 3 a few times for locations that are far away from each other that will not trigger a proximity detected event.
  6. Repeat step 2 to create a new member that belongs to the same team as your first test member.
  7. Now repeat step 3 for this second team member at a location within a few kilometers of the most recently supplied location for the first team member.
  8. You should now be able to see a new message in the proximitydetected queue (you can use the RabbitMQ management plug-in to view the queues without having to write code).
  9. Either query the Redis cache directly or talk to the reality service to see the most up-to-date locations for members.

After having done this manually a few times, most teams building applications like this will then immediately devote some time to automating this process. Ideally you would automate the deployment of all of these services into an integration testing environment with a tool like docker compose or by creating deployments to Kubernetes or some other container scheduling environment.

The test script would then make all of the REST calls mentioned previously (probably many, many more of them) and then, when the test run is finished, assert that the right number of proximity detections showed up with the right values.

I recommend doing this as frequently as possible, either nightly or some time after the most recent check-in. Test suites like this will not only help prepare you for running in production, but will give you a baseline and alert you when new code causes a regression failure.

Summary

Code-wise, this chapter didn’t introduce anything all that powerful or complex. It did, however, introduce several architectural concepts designed to allow multiple microservices to collaborate in support of an application that can scale elastically and react to internet-scale throughput.

Treating a system as event sourced has consequences—good and bad. In this chapter we built out a sample system that accepts commands requesting that a member’s location be recorded. The command system then translates, augments, and ultimately injects events to be handled by an event processor. The event processor is responsible for detecting proximity events and emitting those to allow the rest of the system to notify team members when they are near each other.

ES/CQRS certainly will not solve all of your problems. In some situations it is clearly overkill, and in others it might not be enough. There are also many third-party products that allow data to flow through a system in a very ES-like fashion. Having built your own custom Event Sourcing suite of services, you should now know how these products work and, more importantly, why people choose to use them.

Throughout the chapter I recommended a few things that might make for useful reader exercises. I strongly recommend that you do these exercises, if for no other reason than to further build your muscle memory for building cloud-native, scalable services in ASP.NET Core.

1 These are services that are micro in nature but don’t embrace the Single Responsibility Principle.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.11.247