Implementing event sourcing with Rx

Event Sourcing (ES) is a software design that requires the persistence of events instead of results. In the Programming experience section of Chapter 1 , First Steps Toward Reactive Programming, we had the opportunity to understand how to persist data in a reactive way. Even though this is not mandatory using ES when programming reactive, this is the most natural way of dealing with data persistence from an observable sequence.

Tip

Talking exhaustively about ES is outside the scope of this book. We will assume the reader already has some knowledge about ES. Otherwise, here are some details: ES: https://msdn.microsoft.com/en-us/library/jj591559.aspx CQRS with Event Sourcing: https://msdn.microsoft.com/en-us/library/jj591577.aspx Further reading: Microsoft .NET: Architecting Applications for the Enterprise, 2nd Edition, Microsoft.

By implementing the ES design instead of persisting results at specific dates, we persist data exactly as we receive it from an external system or a user. This means that if a user creates an invoice on a web application, we will persist what the user changes, not the result of any change.

Usually, when working with Rx and ES, we are already working with Command/Query Responsibility Segregation (CQRS). This design asks the subdivision of business object retrieval (query) and business object persistence (command) from a unique global business domain. In other words, we will divide a classic domain into two specialized domains with the need of dealing, persistence, or retrieval.

Within this design, Rx fits perfectly into the Command side. We can create commands as Rx messages that can flow throughout multiple sequence operators to apply all the required transformations, validations, and business logic. At the end, these messages will flow to a persistence observer that will be in charge of storing these messages into a dedicated database. To persist this kind of data, the best fitting choice is using a NoSQL database that is able to persist documents without having to deal with external languages as SQL or different data layouts as what happens within a relational database. Obviously, this doesn't mean that we cannot use a relational database. We can. In the real world, we should, because together with an event persistence, usually, we need to persist an updated state to address the future needs of data retrieval of the huge amounts of events to compute the right ending state. Different from what happens in normal state persistence, this is only a data cache.

When dealing with data persistence messages, the ES behaves excellently with CQRS because when we create a specific object to model any single event or action our system may receive, this mapping will be pretty perfect. Differently, think of a classic object model that tries to model in the same classes the needs of data reading for the UI, data validations for user input, data consistency checks for business needs, and so on. At best it is difficult, at worst we're left with a nightmare to maintain.

Creating and validating an invoice

The following example will show how to create an invoice, validate an invoice, update an invoice detail, and add multiple validated items to get all the resulting calculations immediately. The example will not store data in a real persistence store. We will only take a look at the overall application's design. As already mentioned, any NoSQL database is the most fitting choice of doing such persistences, such as Microsoft DocumentDB, RavenDB, MongoDB, and so on.

Let's have some code. A simple validation framework is available to our application to make all the messages validable required within the observable sequence pipeline:

public interface IValidable { } 
 
public interface IValidableObjectResult<T> 
        where T : IValidable 
{ 
    bool IsValid { get; } 
    IEnumerable<ValidationResult> Result { get; } 
    T Instance { get; } 
} 
 
public sealed class ValidableObjectResult<T> : IValidableObjectResult<T> 
        where T : IValidable 
{ 
    public bool IsValid { get; set; } 
    public IEnumerable<ValidationResult> Result { get; set; } 
    public T Instance { get; set; } 
} 
 
public static class ValidableObjectHelper 
{ 
    /// <summary> 
    /// Validates the argument 
    /// </summary> 
    public static IValidableObjectResult<T> Validate<T>(T arg) 
        where T : IValidable 
    { 
        var context = new ValidationContext(arg); 
        var errors = new List<ValidationResult>(); 
 
        if (Validator.TryValidateObject(arg, context, errors)) 
            return new ValidableObjectResult<T>() 
            { 
                Instance = arg, 
                IsValid = true, 
                Result = Enumerable.Empty<ValidationResult>(), 
            }; 
        else 
            return new ValidableObjectResult<T>() 
            { 
                Instance = arg, 
                IsValid = false, 
                Result = errors.AsEnumerable(), 
            }; 
    } 
} 

Now that we're ready to validate messages, we can create all the basic message types to flow invoice events. We will only map the invoice creation, update, and item addition events that act as commands regarding the event sourcing:

/// <summary> 
/// Represents a command message 
/// </summary> 
public interface ICommand { } 
 
public class CreateNewInvoice : ICommand, IValidable 
{ 
    [Required, Range(1, 100000)] 
    public int InvoiceNumber { get; set; } 
 
    [Required] 
    public DateTime Date { get; set; } 
 
    [Required(AllowEmptyStrings = false), StringLength(50)] 
    public string CustomerName { get; set; } 
 
    [Required(AllowEmptyStrings = false), StringLength(50)] 
    public string CustomerAddress { get; set; } 
 
    //apply updates 
    public static CreateNewInvoice operator +(CreateNewInvoice invoice, UpdateInvoiceCustomerAddress updater) 
    { 
        if (!invoice.InvoiceNumber.Equals(updater.InvoiceNumber)) 
            throw new ArgumentException(); 
 
            return new CreateNewInvoice 
            { 
                InvoiceNumber = invoice.InvoiceNumber, 
                Date = invoice.Date, 
                CustomerName = invoice.CustomerName, 
                CustomerAddress = updater.CustomerAddress, 
            }; 
    } 
} 
 
public class UpdateInvoiceCustomerAddress : ICommand, IValidable 
{ 
    [Required] 
    public int InvoiceNumber { get; set; } 
 
    [Required(AllowEmptyStrings = false), StringLength(50)] 
    public string CustomerAddress { get; set; } 
} 
 
public class AddInvoiceItem : ICommand, IValidable 
{ 
    [Required] 
    public int InvoiceNumber { get; set; } 
 
    [Required] 
    public string ItemCode { get; set; } 
 
    [Required(AllowEmptyStrings = false), StringLength(50)] 
    public string Description { get; set; } 
 
    [Required, Range(1, 10000)] 
    public int Amount { get; set; } 
 
    [Required, Range(-10000, 10000)] 
    public decimal Price { get; set; } 
 
    public decimal TotalPrice { get { return Amount * Price; } } 
} 

In the preceding code, there is an interesting method, the operator + that specifies the result of the addition of a CreateNewInvoice object with an UpdateInvoiceCustomerAddresss object. This lets multiple updating objects create newly running (updated) totals regarding the invoice's details. Later, we will see the usage.

Now, all the requirements are already set. The first step is having a subject to deal with that acts as a sequence for messages and targets for user inputs:

//the root sequence of all user input messages 
var commandSource = new Subject<ICommand>(); 
 
//register the diagnostic output of all messages 
commandSource.Materialize().Subscribe(Console.WriteLine);  

Now that we have the sourcing sequence and a useful live logging system, we will create a reusable validation result sequence to route validation results from the commands flowing within our sequence:

//register validation error output 
var validables = commandSource 
    //routes only validable messages 
    .OfType<IValidable>() 
    //convert messages into validation results 
    .Select(x => ValidableObjectHelper.Validate(x)); 
 
//filter in search of invalid messages 
validables.Where(x => !x.IsValid) 
    //notify the error on the output 
    .Subscribe(x => Console.WriteLine("Validation errors: {0}", string.Join(",", x.Result))); 

With this sequence, we can easily evaluate invalid messages and produce outputs, as we saw  in the preceding example. The same sequence is useful for producing results against valid messages. Mainly, there are two different types of valid messages we can see flowing: invoice detail messages and invoice item messages. To get the running total amount of the invoice, we need to filter the desired messages from other messages (with the OfType operator). Then, we need to group per invoice number and get the running total by applying an accumulator function:

//filter in search of valid messages 
validables.Where(x => x.IsValid) 
    //get back the command message 
    .Select(x => x.Instance as ICommand) 
    //routes only invoice item messages 
    .OfType<AddInvoiceItem>() 
    //group items per invoice 
    .GroupBy(x => x.InvoiceNumber) 
    .Subscribe(group => group 
        //project the message to a new shape for getting the result 
        .Select(x => new { NewItem = x, TotalPrice = x.TotalPrice }) 
        //apply the accumulator function to get the result 
        .Scan((old, x) => new { NewItem = x.NewItem, TotalPrice = old.TotalPrice +
        x.TotalPrice }) 
        //output the result 
        .Subscribe(x => Console.WriteLine("Current total amount: {0:N2}", 
        x.TotalPrice)) 
    ); 

Similarly, we will filter, group, shape, and accumulate values regarding the invoice detail messages. But here, we will use the operator we've just created to add invoice detail and invoice detail update messages to get a new message with the last state:

//filter in search of valid messages 
validables.Where(x => x.IsValid) 
    //get back the command message 
    .Select(x => x.Instance as ICommand) 
    //routes only new invoices or invoice updates messages 
    .Where(x => x is CreateNewInvoice || x is UpdateInvoiceCustomerAddress) 
    //group items per invoice 
    .GroupBy(x => x is CreateNewInvoice ? (x as CreateNewInvoice).InvoiceNumber :
    (x as UpdateInvoiceCustomerAddress).InvoiceNumber) 
    .Subscribe(group => group 
        //apply the updates to get the last state 
        //a custom "+" operator to apply updates to the original invoice 
        //is available into the CreateNewInvoice class 
        .Scan((old, x) => x is CreateNewInvoice ? x as CreateNewInvoice : (old as 
        CreateNewInvoice) + (x as UpdateInvoiceCustomerAddress)) 
        //change type 
        .OfType<CreateNewInvoice>() 
        //output the new invoice details 
        .Subscribe(x => Console.WriteLine("Available an invoice nr: {0} to {1} 
         living in {2}", x.InvoiceNumber, x.CustomerName, x.CustomerAddress)) 
    ); 

Event sourcing an invoice creation

Now, we're ready to execute a simple test to check the usage of all the commands:

Console.WriteLine("Return to start saving an invoice"); 
Console.ReadLine(); 
 
var invoicenr = new Random(DateTime.Now.GetHashCode()).Next(0, 1000); 
//create a new invoice 
 
commandSource.OnNext(new CreateNewInvoice { InvoiceNumber = invoicenr, Date = DateTime.Now }); 
//now a validation error will flow out the sequence 
Console.WriteLine("Return to continue"); 
Console.ReadLine(); 
 
//create a valid invoice 
commandSource.OnNext(new CreateNewInvoice { InvoiceNumber = invoicenr, Date = DateTime.Now.Date, CustomerName = "Mr. Red", CustomerAddress = "1234, London Road, Milan, Italy" }); 
Console.WriteLine("Return to continue"); 
Console.ReadLine(); 
 
//updates the invoice customer address 
commandSource.OnNext(new UpdateInvoiceCustomerAddress { InvoiceNumber = invoicenr, CustomerAddress = "1234, Milan Road, London, UK" }); 
Console.WriteLine("Return to continue"); 
Console.ReadLine(); 
 
//adds some item 
commandSource.OnNext(new AddInvoiceItem { InvoiceNumber = invoicenr, ItemCode = "WMOUSE", Price = 44.40m, Amount = 10, Description = "Wireless Mouse" }); 
Console.WriteLine("Return to continue"); 
Console.ReadLine(); 
 
commandSource.OnNext(new AddInvoiceItem { InvoiceNumber = invoicenr, ItemCode = "DMOUSE", Price = 17.32m, Amount = 5, Description = "Wired Mouse" }); 
Console.WriteLine("Return to continue"); 
Console.ReadLine(); 
 
commandSource.OnNext(new AddInvoiceItem { InvoiceNumber = invoicenr, ItemCode = "USBC1MT", Price = 2.00m, Amount = 100, Description = "Usb cable 1mt" }); 
 
Console.WriteLine("END"); 
Console.ReadLine();  

The overall result is an application to flow commands to subscribers that will inform users of the eventually available validation errors or will flow messages to the right accumulator functions to inform the user of the last invoice status.

Although the example is very simple and lacks in any kind of UI, it should show the difference with state-driven programming when dealing with invoice creation. An activity that usually any developer will be familiar with. To store messages into a database, simply save valid messages. Nothing more. We simply store commands without any transformation. Later, when we need to read back these messages from the database, we can get them back into another (or the same) sequence and they will be available to the application with the same behavior of the new messages. This is the difference with the state-driven system. Here, we can review whole command history read from a database instead of getting the only the final result.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.124.21