14

Implementing Microservices with .NET

In Chapter 5, Applying a Microservice Architecture to Your Enterprise Application, you learned the theory and basic concepts of microservices. In this chapter, you will learn how to put into practice those general concepts and tools to implement microservices in .NET. The focus is on worker microservices, that is, microservices that are not part of the public interface of your application. You will learn how to implement a frontend service in Chapter 16, Implementing Frontend Microservices with ASP.NET Core, after having learned about techniques for implementing a presentation layer in Chapter 15, Presenting ASP.NET Core MVC. Other techniques for implementing presentation layers are described in Chapter 17, Blazor WebAssembly, and Chapter 18, Native versus Web Clients, while techniques for implementing public APIs have been already discussed in Chapter 13, Applying Service-Oriented Architectures with .NET.

This chapter explains the structure of a .NET microservice using practical examples and discusses the options for exchanging messages and serializing .NET data structures.

More specifically, in this chapter, you will learn about the following subjects:

  • Communication and data serialization
  • Implementing worker microservices with ASP.NET Core
  • Implementing microservices with .NET worker services and message brokers

The first section of the chapter discusses the main coordination and queuing problems that arise in microservices communication and how to solve them with either message brokers or custom permanent queues. Then, it discusses communication and techniques.

The remaining sections include examples of how to implement the same microservice with two different techniques. The techniques in the second section is based on the ASP.NET Core implementation of gRPC, and on an SQL Server-based permanent queue. The example in the third section shows how to use message brokers and two serialization techniques. The example uses RabbitMQ, but RabbitMQ can be replaced by Azure Service Bus.

Technical requirements

This chapter requires the free Visual Studio 2022 Community Edition or better with all database tools installed. All concepts are exposed with the help of two simple example applications, based on the WWTravelClub book use case. The code for this chapter is available at https://github.com/PacktPublishing/Software-Architecture-with-C-10-and-.NET-6-3E. The second example also requires the installation of the RabbitMQ message broker, which in turn requires the previous installation of a 64-bit version of Erlang. An adequate Erlang version can be downloaded from https://github.com/erlang/otp/releases/download/OTP-24.0.6/otp_win64_24.0.6.exe. The RabbitMQ Windows installer can be downloaded from https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.5/rabbitmq-server-3.9.5.exe. We recommend you launch both installations from an administrative account.

You can use also use the RabbitMQ Dockerfile (https://github.com/docker-library/rabbitmq/blob/eb4800c1ee1f0e380462b547cd2bd58e5edb1bd3/3.9/ubuntu/Dockerfile), which will take care of all the RabbitMQ dependencies. However, in this case, RabbitMQ will not be installed as a Windows service but will run in your local Docker Desktop (see the Containers and Docker and Visual Studio support for Docker subsections of Chapter 5, Applying a Microservice Architecture to Your Enterprise Application).

Communication and data serialization

As explained in the Microservices design principles subsection of Chapter 5, Applying a Microservice Architecture to Your Enterprise Application, requests to a microservices-based application can’t cause long chains of recursive microservices calls. In fact, each call adds both a wait time and a communication time to the actual processing time, thus leading to unacceptable levels of overall response time, as shown in the figure below.

Figure 14.1: Tree of blocking RPC calls

Messages 1-6 are triggered by a request to the A microservice and are sent in sequence, so their processing times sum up to the response time. Moreover, once sent, message 1 from microservice A remains blocked till it receives the last message (6); that is, it remains blocked for the whole lifetime of the overall recursive communication process.

Microservice B remains blocked twice, the first time during the 2-3 communication and then during the 4-5 communication. To sum up, Remote Procedure Calls (RPC) involve high response times and a waste of microservices computation time.

That’s why microservices avoid blocking recursive RPC and prefer a data-driven approach that starts from the leaves of the procedure calls tree. Put simply, tree nodes, instead of waiting for requests from their parent nodes, send pre-processed data to all their possible callers each time their private data changes, as shown in the figure below.

Figure 14.2: Data-driven asynchronous communication

Both communications labeled 1 are triggered when the data of the C/D microservices changes, and they may occur in parallel. Moreover, once a communication is sent, each microservice can return to its job without waiting for a response. Finally, when a request arrives to microservice A, A already has all the data it needs to build the response with no need to interact with other microservices.

In general, microservices based on the data-driven approach pre-process data and send it to whichever other service might be interested. This way, each microservice already contains pre-computed data it can use to respond immediately to user requests with no need for further request-specific communications.

More specifically, as new data becomes available, each microservice does its job and then sends the results to all interested microservices with asynchronous communications; that is, a microservice doesn’t wait for any acknowledgment from its recipients. Asynchronous communication is a must, since each communication act usually triggers other communication acts recursively for the recipient, so we can’t block the whole tree of microservices till the final acknowledgment arrives from all the tree leaves, as in the case of RPC.

The absence of acknowledgments creates complex coordination problems that increase the application development and testing time. Therefore, actual applications mix efficient short-path RPC calls with the data-driven approach.

Another typical optimization of inter-application communications is the usage of binary serialization, which produces shorter messages and requires less bandwidth and less processing time.

Binary serialization is discussed in detail in the next subsection. Then, also, RPC- and data-driven asynchronous communication will be analyzed in dedicated subsections.

Efficient and flexible binary serialization

The .NET echo system contains several fast, platform-specific binary serializers that are able to produce compact, short messages with very low computational costs. In the Implementing microservices with .NET worker services and message brokers section, we will test one of the fastest, the Binaron package.

Unfortunately, efficient binary serializers suffer from a couple of well-known problems:

  • Since the most performant binary serializers are tied to a specific platform, they are not interoperable. Thus, for instance, the Java binary format is not compatible with the .NET binary format. This constraint creates problems when your application microservices are heterogeneous and use different technologies.
  • If one uses the same platform-specific, in-memory binary format, adding/removing properties to/from an object breaks compatibility. Thus, a microservice that uses an old version of a class is not able to de-serialize data created with a newer version of the same class. This constraint creates dependencies between the microservices’ CI/CD cycles, because when a microservice changes to meet new requirements, it causes recursive changes in all other microservices that communicate with it.

In Chapter 13, Applying Service-Oriented Architectures with .NET, we said that the JSON format was widely adopted because it avoids these two problems since it is not tied to any specific language/runtime and added properties can be simply ignored, while removed properties are handled by assigning default values.

The ProtoBuf binary format was conceived to ensure the same JSON serialization/deserialization advantages for binary formats.

ProtoBuf achieves interoperability by defining abstract elementary types and their binary representations. Then, each framework takes care of converting its native types to/from them. Elementary types are combined into complex structures called messages, which represent classes.

Compatibility between different versions of the same message is ensured by assigning a unique integer number to each property. This way, when a message is de-serialized into an object, just the integers that mark the property of the given message version are searched in the serialized data and de-serialized. When a property number is not found in the serialized data, the default value is taken for the associate property. This way, ProtoBuf messages have the same serialization/deserialization advantages as JSON objects.

There are also other serialization proposals similar to ProtoBuf. Some of them also ensure better performance, but at the moment, ProtoBuf, which was created by Google, is the de-facto standard for interoperable binary communication.

ProtoBuf messages are defined in .proto files and are then compiled into code in the target language by language-specific tools. The section that follows describes the ProtoBuf data description language.

The ProtoBuf language

Each .proto file starts with a declaration of the version of ProtoBuf. At the moment, the highest available version is proto3:

syntax = "proto3";

Then, if the target language is .NET, you can specify the namespace where you generate all the .NET classes corresponding to the ProtoBuf types defined in the file:

option csharp_namespace = "FakeSource";

Then, you can import definitions contained in other .proto files, with one or more import declarations:

import "google/protobuf/timestamp.proto";

The above definition imports the definition of the TimeStamp type, which encodes both the DateTime and DateTimeOffset .NET types. TimeStamp is not a ProtoBuf simple type but is defined in a standard ProtoBuf types library.

Finally, we can scope all message definitions to a package to avoid name collisions. ProtoBuf packages have the same role as a .NET namespace but are not automatically converted into .NET namespaces during .NET code generation, since .NET namespaces are specified with the option C# declaration:

package purchase;

Each .proto file can contain several message definitions. Below is an example message:

message PurchaseMessage {
  string id = 1;
  google.protobuf.Timestamp time = 2;
  string location = 3;
  int32 cost =4; 
  google.protobuf.Timestamp purchaseTime = 5;
}

Each property is specified by the property type followed by the property name, and then by the unique integer associated with that property. Property names must be in camel case, but they are converted to Pascal case during .NET code generation.

If a new version of PurchaseMessage is created, compatibility with the past version can be maintained by not reusing the integers assigned to the properties of the old version, and by removing just unused properties, as in the example shown below:

message PurchaseMessage {
  string id = 1;
  int32 cost =4; 
  google.protobuf.Timestamp purchaseTime = 5;
  Reseller reseller = 7;
}

The new version of PurchaseMessage doesn’t contain properties 2 and 3, but it contains the new reseller property, marked with the new 7 integer. The Reseller type is defined by another message that can be contained either in the same .proto file or an imported file.

Clearly, compatibility is maintained just with clients that don’t use the removed properties, while clients directly affected by the changes must be updated.

Collections are represented by prefixing the name of the collection element type with the repeated keyword:

message PurchaseMessage {
  ...
  repeated Product products = 3;
  ...
}

Repeated data are translated into the Google.Protobuf.Collections.RepeatedField<T> .NET type, which implements IList<T>.

Dictionaries are represented with the map<T1, T2> type:

message PurchaseMessage {
  ...
  map<string, int> attributes = 3;
  ...
}

Messages can be nested into other messages, in which case they generate classes defined in other .NET classes during code generation:

message PurchaseMessage {
  message Product {
    int32 id = 1;
    string name = 2;
    uint32 cost = 3;
    uint32 quantity = 4;
  }
  ...
  repeated Product products = 3;
  ...
}

We can also define enum types that translate directly to .NET enum types:

enum ColorType {
    RED = 0;
    GREEN = 1;
    ...
  }

It is also possible to define messages with conditional content. This is useful for sending either a response or error information:

message Payload {
    ...
}
message Error {
    ...
}
message ResponseMessage {
  one of result {
    Error error = 1;
    Person payload = 2;
  }
}

Once a microservice receives a .NET object of the ResponseMessage type, it can process it as follows:

ResponseMessage  response = ...;
switch (response.ResultCase)
{
    case ResponseMessage.ResultOneofCase.Payload:
        HandlePayload(response. Payload);
        break;
    case ResponseMessage.ResultOneofCase.Error:
        HandleError(response.Error);
        break;
    default:
        throw new ArgumentException();
}

The table below summarizes all the ProtoBuf simple types and their equivalent .NET types:

.NET types

ProtoBuf types

double

double

float

float

string

string

bool

bool

ByteString

bytes

int

int32, sint32, sfixed32

uint

uint32, fixed32

long

Int64, sint64, sfixed64

ulong

uint64, fixed64

The ByteString .NET type is defined in the Google.Protobuf namespace contained in the Google.Protobuf NuGet package. It can be converted to byte[] with its .ToByteArray() method. A byte[] object can be converted into a ByteString with the ByteString.CopyFrom(byte[] data) static method.

int32, sint32, and sfixed32 encode .NET int. Now, sint32 is convenient when the integer is more likely to be negative, while the sfixed32 type is convenient when the integer is likely to contain more than 28 bits. Similar considerations apply to uint32 and fixed32.

The same criteria apply to 64-bit integers, but in this case, the threshold for the convenience of sfixed64 and fixed64 is 56 bits.

ProtoBuf simple types are not nullable. This means that they can’t have a null value, and if no value is assigned to them, they take a default value. The default value of a string is an empty string, while the default value of a ByteString is an empty ByteString.

If you need nullable types, you must include a predefined .proto file:

import "google/protobuf/wrappers.proto"

Below is a table that details the correspondence between .NET nullable simple types and ProtoBuf nullable wrappers:

.NET types

ProtoBuf types

double?

google.protobuf.DoubleValue

float?

google.protobuf.FloatValue

string?

google.protobuf.StringValue

bool?

google.protobuf.BoolValue

ByteString?

google.protobuf.BytesValue

int?

google.protobuf.Int32Value

uint?

google.protobuf.UInt32Value

long?

google.protobuf.Int64Value

ulong?

google.protobuf.UInt64Value

DateTime, DateTimeOffset, and TimeSpan have no direct equivalent in ProtoBuf, but the Google.Protobuf.WellKnownTypes namespace contained in the Google.Protobuf NuGet package contains the Timestamp type, which maps from/to DateTime and DateTimeOffset, and the Duration type, which maps from/to TimeSpan. The mapping is completely analogous to that of ByteString. Thus, for instance, a Duration is obtained from a TimeSpan with the Duration.FromTimeSpan static method, while a Duration is transformed into a TimeSpan by calling its .ToTimeSpan instance method.

The usage of Duration and Timestamp in .proto files is shown below:

syntax = "proto3"
import "google/protobuf/duration.proto";  
import "google/protobuf/timestamp.proto";
message PurchaseMessage {
  string id = 1;
  google.protobuf.Timestamp time = 2;
  string location = 3;
  int32 cost =4; 
  google.protobuf.Timestamp purchaseTime = 5;
  google.protobuf.Duration travelDuration = 6;
}

Please notice that usage needs the import of predefined .proto files.

At the moment, there is no equivalent to the .NET decimal type, but it will probably be introduced in the next version. However, you can encode decimals with two integers, one for the integer part and the other for the decimal part, with a message like the one below:

message ProtoDecimal {
    // Whole units part of the decimal
    int64 units = 1;
    // Nano units of the decimal (10^-9)
    // Must be same sign as units
    sfixed32 nanos = 2;
}

We can add implicit conversion to the .NET decimal, with a partial class that combines with another partial class, which is automatically generated from the .proto file:

public partial class DecimalValue
{
    private const decimal nanoFactor = 1000000000m;
    
    public static implicit operator decimal(ProtoDecimal pDecimal)
    {
        return pDecimal.Units + pDecimal.Nanos / nanoFactor;
    }
    public static implicit operator ProtoDecimal (decimal value)
    {
        return new ProtoDecimal 
          {
             Units = decimal.ToInt32(value),
             Nanos= decimal.ToInt32((value - Units) * nanoFactor), 
          };
    }
}

We have described almost completely the ProtoBuf data description language. The only missing subject is the representation of variable/unknown types, which are rarely used. However, the Further reading section contains a link to the official documentation. The next section explains how to serialize and deserialize messages.

ProtoBuf serialization

An object tree can be serialized as shown below:

using Google.Protobuf;
...
PurchaseMessage purchase = ....
byte[]? body = null;
using (var stream = new MemoryStream())
{
    purchase.WriteTo(stream);
    stream.Flush();
    body = stream.ToArray();
}

Deserialization is the same:

Byte[] body = ...
PurchaseMessage? message = null;
using (var stream = new MemoryStream(body))
{
    message = PurchaseMessage.Parser.ParseFrom(stream);
}

The next subsection describes the usage of RPC in microservices.

Efficient and flexible RPC

The RPC approach can be adopted in some application microservices with good results if the following conditions are met:

  • The chain of recursive calls is very short, usually just one call without recursive calls.
  • Low communication latency and high channel bandwidth. This condition is met by intranet communications that take place on high-speed Ethernet within the same data center.
  • Data is serialized quickly and in a very size-efficient format. This condition is met by any efficient binary serializer.

The gRPC protocol brings all advantages of ProtoBuf to RPC, since, by default, it is based on ProtoBuf. gRPC/Protobuf is a binary protocol that works over an HTTP/2 connection. It is worth pointing out that gRPC can’t work with HTTP versions less than 2. In the remainder of the chapter, we will always assume that gRPC uses ProtoBuf.

gRPC uses .proto files, but together with data, gRPC .proto files also define services with their RPC methods. Below is a service definition:

service Counter {
  // Accepts a counting request
  rpc Count (CountingRequest) returns (CountingReply);
 //Get current count for a given time slot
  rpc GetCount (TimeSlotRequest) returns (TimeSlotDataReply);
}

Each service is introduced by the service keyword, while each method is introduced by the rpc keyword. Each service specifies an input message and an output message. If either of these two messages are empty, we can use the predefined google.protobuf.Empty message, as shown below:

...
import "google/protobuf/empty.proto";
...
service Counter {
  ...
  rpc AllSlots(google.protobuf.Empty) return (AllDataReply);
}

.proto files can be used to generate both server code and client code. In client code, each service is translated into a proxy class with the same methods declared in the service. Each of the proxy methods automatically invokes the remote service method and returns its result.

The server code, instead, translates each service into an abstract class, whose virtual methods correspond to methods declared in the service. The developer is responsible for inheriting from this abstract class and providing implementations for all service methods.

Below is an example of how to inherit from a similar class:

public class CounterService: Counter.CounterBase
{
    ...
    public override async  Task<CountingReply> Count(
          CountingRequest request, ServerCallContext context)
    {
        CountingReply reply =...
          return reply;
    }
}

Each method receives both the input message and a context object. Since gRPC services use ASP.NET Core infrastructure, the context object furnishes access to the request HttpContext through context.GetHttpContext().

An ASP.NET Core application is enabled to gRPC with builder.Services.AddGrpc() and by declaring each service with something like app.MapGrpcService<CounterService>();.

More details on both gRPC servers and clients will be given while discussing the example in the Implementing worker microservices with ASP.NET core section.

Services can receive as input and return continuous streams of data, where a long-term connection between client and server is established. However, the usage of streams in microservices is not common since microservices are ephemeral processes that are frequently shut down and moved from one processing node to another by orchestrators, so long-term connections are not easy to maintain.

Below is a service that accepts and returns a stream:

service StreamExample {
  rpc Echo (stream MyMessage) returns (stream MyMessage);
}

Each input stream is passed as an argument to the .NET implementation of the method. If the method returns a stream, the .NET implementation of the method must return a Task, and also the output stream is passed as an argument to the method:

public override async Task Echo(
     IAsyncStreamReader<MyMessage> requestStream
      IServerStreamWriter<MyMessage> responseStream,
      ServerCallContext context){
     ...
          While(...)
          {
             bool inputFinished =  !await requestStream.MoveNext();
             var current = requestStream.Current;
             ...
             await responseStream.WriteAsync(result);
          }
}

On the client side, both input and output streams are available in the call object that is returned when the proxy method is called without awaiting it, as shown below:

var call = client.Echo();
...
await call.RequestStream.WriteAsync(...);
...
bool inputFinished =  ! await call.ResponseStream.MoveNext();
var current = call.ResponseS.Current;
...
call.RequestStream.CompleteAsync();

The CompleteAsync() method closes the request stream, declaring that the input is finished.

More practical details on the client usage are given in the example of the Implementing worker microservices with ASP.NET core section, while the Further reading section contains a link to the .NET gRPC official documentation.

The next subsection describes how to implement data-driven asynchronous communication.

Reliable data-driven asynchronous communication

Non-blocking communication must necessarily rely on non-volatile queues to decouple the sender thread from the receiving thread. Decoupling can be achieved with just one queue on each communication path, but sometimes additional queues improve performance and increase CPU usage. Queues may be placed both inside the sending microservice, inside the receiving microservice, or outside them by using dedicated queuing services called message brokers.

Azure Service Bus, which we described in the .NET communication facilities subsection of Chapter 5, Applying a Microservice Architecture to Your Enterprise Application, is a message broker that, like most message brokers, offers queuing services and publisher/subscriber communication. In this chapter, we will also describe RabbitMQ, which offers queuing services and publisher/subscriber communication that is broadly similar to Azure Service Bus topics. Since it is easier to debug code that uses a local instance of RabbitMQ, often it is convenient to use RabbitMQ during development and then move to Azure Service Bus.

Queues decouple the sender and receiver but do not ensure that messages are not lost. Message losses can be prevented with confirmations and timeouts. More specifically:

  • Queues must be stored on permanent storage; otherwise, their content can be lost if the processes controlling them either crash or are shut down.
  • If a confirmation that the message was successfully inserted in the queue doesn’t arrive within a timeout time, the source assumes that the message was lost and retries the operation.
  • When a message is extracted from a queue, it remains blocked and inaccessible to other consumers. If a confirmation that the message has been successfully processed arrives within a timeout time, the message is removed from the queue; otherwise, it is unblocked and becomes available again to other consumers.

All confirmations can be handled asynchronously with the exception of the insertion in the first queue of a communication path. In fact, if the sending code doesn’t remain blocked waiting for the confirmation but moves to further processing and the message is lost, there is no way to resend the message, since the message cannot be taken from any other queue.

For this reason, sometimes microservices that use a message broker also have an internal queue. More specifically, the main microservice thread produces all messages and stores them in a local queue that can be implemented with a database table. Another thread takes care of extracting messages from this queue and sending them to the message broker. Messages that are removed from the local queue are blocked and removed only when an asynchronous confirmation arrives from the message broker. This technique is used in the example of Chapter 16, Implementing Frontend Microservices with ASP.NET Core. The main advantage of the local queue is that confirmation from a local queue usually arrives faster because of there being less concurrency with other threads/processes (don’t forget that each microservice should have its private database/permanent storage), so the blocking time is more acceptable.

Using a queue inside each receiver is a viable alternative to message brokers. The main advantage is that the process that handles the queue is not shared among several microservices, so all queue operations are faster. In particular, the confirmation of each insertion is immediate, so the sender can use a blocking RPC call to send the message. However, this simple solution has the following disadvantages:

  • There is no way to implement the publisher/subscriber pattern.
  • Just one microservice instance can extract messages from the queue. Therefore, microservices can’t be scaled horizontally. Limited vertical scaling can be achieved by increasing the number of processor cores and by processing queue messages with parallel threads.

A similar approach can be efficiently implemented with gRPC and ASP.NET Core as follows:

  1. The sender sends the message to a gRPC method.
  2. The gRPC method just enqueues the message and immediately returns a confirmation to the sender.
  3. An ASP.NET Core hosted process takes care of extracting the messages from the queue and passing them to several parallel threads.
  4. When a message is passed to a thread, it remains blocked and inaccessible. It is removed only after the thread confirms that the message has been successfully processed. If, instead, a thread reports a failure, the corresponding message is unblocked so it can be passed to another thread.

ASP.NET Core threads take care of the necessary input parallelism. Some horizontal parallelism can be achieved by using load balancers and several web servers. However, in this case, either all the web servers use the same database, thus increasing the concurrency on the queue, or we use several sharded databases.

This approach is described in more detail in the example in the Implementing worker microservices with ASP.NET core section. As we will see, it is simple to implement and ensures good response time, but due to its limited scalability, it is adequate just for small or medium-sized applications with low to medium traffic.

If, for some reason, either an insertion in a queue or the processing of a message extracted by a queue requires more time than the timeout time, the operation is repeated so that the same message is processed twice. Therefore, messages must be idempotent, meaning processing them once or several times must have the same effect. Some operations, for instance, a record update or delete, are intrinsically idempotent, but others, for instance, record additions, are not.

Idempotency can always be forced by assigning a unique identifier to messages and then storing the identifiers of the already processed messages. This way, each incoming message can be discarded when its identifier is found among the identifiers of the already processed messages. We will use this technique in all examples in this chapter.

Queues, confirmations, and message resends ensure that requests to a single microservice are safely processed, but how can we handle requests that involve several cooperating microservices? We will discover that in the next subsection.

Distributed transactions

The reliable data-driven communication techniques described so far are the building blocks for solving more complex cooperation problems.

Let’s suppose that a user operation triggers processing and storage in several related microservices. The user operation can be considered successfully completed only if all involved processing/storage operations succeed.

Moreover, if a single processing/storage operation also fails for some fundamental reason, retrying the failed operation doesn’t help. Think, for instance, of a purchase that fails because the user has not got enough funds to complete the payment: the only way out is to undo all the operations that have already been performed.

In general, similar situations must be handled in a transactional way; either all operations are performed or no operation is performed. Transactions that span several microservices are known as distributed transactions. In theory, distributed transactions can be handled with the following two-stage protocol:

  1. In the first stage, all operations are executed, each one in the scope of a local transaction (for instance, within the scope of each microservice database transaction). The success or failure of each operation is then returned to a transaction coordinator.
  2. In the second stage, the transaction coordinator informs all microservices of the success or failure of the overall operation. In the case of failure, all local transactions are rolled back; otherwise, they are committed.

When using asynchronous messages, confirmations may arrive after a quite a large amount of time and may interleave with other transactions performed on the same resources. Therefore, having all local resources blocked by a local transaction during a possibly time-consuming distributed transaction is not acceptable.

For this reason, microservices transactions use the saga pattern: all local operations are performed without opening a local transaction, and in the case of failure, they are compensated by other operations that undo the initial operations.

Undoing a database insertion is quite easy, since it is enough to remove the added item, but undoing modifications and deletes is quite difficult and requires the storage of additional information. The general solution to this problem is storing records that represent all database changes in a table. These records can be used to compute a compensation operation or to restore the previous database state starting from a reference database state. We already discussed this storage technique, which is called event sourcing, in the Event sourcing section of Chapter 11, Understanding the Different Domains in Software Solutions.

When a saga transaction is undone, if other saga transactions depend on the undone changes, we must also undo them. For instance, suppose that an accepted purchase order depends on the funds uploaded by a user on an e-commerce platform. Then, if the funds upload transaction is undone, the purchase must also be undone.

In order to avoid similar chain reactions when a saga transaction is undone, often new transactions are accepted only if they depend on changes that took place before a certain safety interval. Thus, for instance, uploaded funds are made available only after, say, 5-10 minutes, because it is very unlikely that a transaction will be undone after more than 5-10 minutes.

Saga transactions may use two fundamental techniques:

  • Orchestration: When the transaction starts, an orchestrator component is created that takes care of sending the necessary messages to all involved microservices and receiving their success/failure messages. This technique is easy to implement but creates dependencies between the software lifetimes of the involved microservices since the orchestrator must depend on the details of all microservices involved in the saga. Moreover, this technique may have poor performance since the orchestrator becomes a bottleneck.
  • Choreography: There is no centralized control of the transaction but each microservice is invoked by a different sending microservice and forwards the success/failure messages it receives to other communication neighbors. Choreography overcomes the disadvantages of orchestration but it is more difficult to implement and test.

Implementing worker microservices with ASP.NET core

In this section, we will show you how to implement a microservice that receives communications through gRPC and an internal queue based on a database table. The example is a part of the WWTravelClub book use case. The first subsection briefly describes the microservice specifications and the overall architecture.

The problem and the architecture

Our example microservice is required to compute the daily sums of all purchases. According to the data-driven approach, we suppose that all daily sums are pre-computed by receiving messages that are sent as soon as a new purchase is finalized. The purpose of the microservice is to maintain a database of all purchases and of all daily sums that can be queried by an administrative user. We will implement just the functionalities needed to fill the two database tables.

The implementation described in this section is based on an ASP.NET Core application that hosts a gRPC service. The gRPC service simply fills a messages queue and immediately returns to avoid the sender remaining blocked for the whole time of the computation.

The actual processing is performed by an ASP.NET Core hosted service declared in the dependency injection engine associated with the application host. The worker-hosted service executes an endless loop where it extracts N messages from the queue and passes them to N parallel threads that process them.

When the N messages are taken from the queue, they are not immediately removed but are simply marked with the extraction time. Since messages can only be extracted from the queue if their last extraction time is far enough ahead (say, a time T), no other worker thread can extract them again while they are being processed. When message processing is successfully completed, the message is removed from the queue. If the processing fails, no action is taken on the message, so the message remains blocked in the queue till the T interval expires, and then it can be picked up again by the worker thread.

The microservice can be scaled vertically by increasing the processor cores and the number N of threads. It can be scaled horizontally, too, by using a load balancer that splits the loads to several identical copies of the ASP.NET Core application. This kind of horizontal scaling increases the number of threads that can receive messages and the number of worker threads, but since all ASP.NET Core applications share the same database, it is limited by database performance.

The database layer is implemented in a separate DLL and all functionalities are abstracted in two interfaces, one for interacting with the queue and another for adding a new purchase to the database.

The next subsection briefly describes the database layer. We will not give all the details since the main focus of the example is the microservice architecture and the communication technique. However, the full code is available in the GitHub repository associated with the book.

The storage layer

The database layer uses Entity Framework Core and is based on three entities with their associated tables:

  • A QueueItem entity that represents a queue item
  • A Purchase entity that represents a single purchase
  • A DayTotal entity that represents the total of all purchases performed in a given day

Below is a definition of the interface that manipulates the queue:

public interface IMessageQueue
{
    public Task<IList<QueueItem>> Top(int n);
    public Task Dequeue(IEnumerable<QueueItem> items);
    public Task Enqueue(QueueItem item);
}

Top extracts the N messages to pass to a maximum of N different threads. Enqueue adds a new message in the queue. Finally, Dequeue removes the items that have been successfully processed from the queue.

The interface that updates the purchase data is defined as shown below:

public interface IDayStatistics
{
    Task<decimal> DayTotal(DateTimeOffset day);
    Task<QueueItem?> Add(QueueItem model);
}

Add adds a new purchase to the database. It returns the input queue item if the addition is successful, and null otherwise. DayTotal is a query method that returns a single day total.

The application layer communicates with the database layer through these two interfaces, through the three database entities, through the IUnitOfWork interface, which, as explained in Chapter 11, Understanding the Different Domains in Software Solutions, abstracts the DbContext, and through the dependency injection extension method below:

public static class StorageExtensions
{
    public static IServiceCollection AddStorage(this IServiceCollection services,
        string connectionString)
    {
        services.AddDbContext<IUnitOfWork,MainDbContext>(options =>
            options.UseSqlServer(connectionString, b => 
b.MigrationsAssembly("GrpcMicroServiceStore")));
        services.AddScoped<IMessageQueue, MessageQueue>();
        services.AddScoped<IDayStatistics, DayStatistics>();
        return services;
    }
}

This method, which will be called in the application layer dependency injection definition, receives as input the database connection string and adds the DBcontext abstracted with IUnitOfWork, with the two interfaces we defined before.

The database project, called GrpcMicroServiceStore, is contained in the ch14/GrpcMicroService folder of the GitHub repository associated with the book. It already contains all the necessary database migrations, so you can create the needed database with the steps below:

  1. In the Visual Studio Package Manager Console, select the GrpcMicroServiceStore project.
  2. In Visual Studio Solution Explorer, right-click on the GrpcMicroServiceStore project and set it as the startup project.
  3. In the Visual Studio Package Manager Console, issue the Update-Database command.

The next subsection describes the microservices application layer.

The application layer

The application layer is an ASP.NET Core gRPC service project called GrpcMicroService. When the project is scaffolded by Visual Studio, it contains a .proto file in its Protos folder. This file needs to be deleted and replaced by a file called counting.proto, whose content must be:

syntax = "proto3";
option csharp_namespace = "GrpcMicroService";
import "google/protobuf/timestamp.proto"; 
package counting;
service Counter {
  // Accepts a conting request
  rpc Count (CountingRequest) returns (CountingReply);
}
message CountingRequest {
  string id = 1;
  google.protobuf.Timestamp time = 2;
  string location = 3;
  sint32 cost =4; 
  google.protobuf.Timestamp purchaseTime = 5;
}
message CountingReply {}

The above code defines the gRPC service with its input and output messages and the .NET namespace where you place them. We import the "google/protobuf/timestamp.proto" predefined .proto file because we need the TimeStamp type. The request contains purchase data, the time when the request message was created, and a unique message id that is used to force message idempotency.

In the database layer, the implementation of the IDayStatistics.Add method uses this id to verify if a purchase with the same id has already been processed, in which case it returns immediately:

bool processed = await ctx.Purchases.AnyAsync(m => m.Id == model.MessageId);
if (processed) return model;

Automatic code generation for this file is enabled by replacing the existing protobuf XML tag with:

<Protobuf Include="Protoscounting.proto" GrpcServices="Server" />

The Grpc attribute set to "Server" enables server-side code generation.

In the Services project folder, the predefined gRPC service scaffolded by Visual Studio must be replaced with a file named CounterService.cs with the content below:

using Grpc.Core;
using GrpcMicroServiceStore;
namespace GrpcMicroService.Services;
public class CounterService: Counter.CounterBase
{
    private readonly IMessageQueue queue;
    public CounterService(IMessageQueue queue)
    {
        this.queue = queue;
    }
    public override async  Task<CountingReply> Count(CountingRequest request, 
        ServerCallContext context)
    {
            await queue.Enqueue(new GrpcMicroServiceStore.Models.QueueItem
            {
                Cost = request.Cost,
                MessageId = Guid.Parse(request.Id),
                Location = request.Location,
                PurchaseTime = request.PurchaseTime.ToDateTimeOffset(),
                Time = request.Time.ToDateTimeOffset()
            });
            return new CountingReply {  };  
    }
}

The actual service that receives the purchase messages inherits from the Counter.CounterBase abstract class created by the code generator from the counting.proto file. It receives the database layer interface IMessageQueue using dependency injection and overrides the abstract Count method inherited from Counter.CounterBase. Then, Count uses IMessageQueue to enqueue each received message.

Before compiling, a few other steps are necessary:

  1. We must add a reference to the database layer GrpcMicroServiceStore project.
  2. We must add the database connection string to the appsettings.json setting file:
    "ConnectionStrings": {
            "DefaultConnection": "Server=(localdb)\mssqllocaldb;Database=grpcmicroservice;Trusted_Connection=True;MultipleActiveResultSets=true"
    }
    
  3. We must add all the necessary database layer interfaces to the dependency injection by calling the AddStorage database layer extension method:
    builder.Services.AddStorage(
        builder.Configuration.GetConnectionString("DefaultConnection"));
    
  4. In Program.cs, we must remove the declaration of the gRPC service scaffolded by Visual Studio, and we must replace it with:
    app.MapGrpcService<CounterService>();
    

At this point, compilation should succeed. The next subsection describes the hosted service that performs the actual queue processing.

Processing the queued requests

The actual request processing is performed by a worker-hosted service that runs in parallel with the ASP.NET Core pipeline. It is implemented with the hosted services we discussed in the Using generic hosts section of Chapter 5, Applying a Microservice Architecture to Your Enterprise Application. It is worth recalling that hosted services are implementations of the IHostedService interface defined in the dependency injection engine as follows:

builder.Services.AddHostedService<MyHostedService>();

Being defined in the dependency injection engine, they are automatically injected services in their constructors. Hosted services are used to execute parallel threads that run independently of the remainder of the application. Usually, they are not defined by directly implementing the IHostedService interface, but by inheriting from the abstract BackgroundService class and overriding its Task ExecuteAsync(CancellationToken token) abstract method.

The ExecuteAsync method usually contains an endless loop that exits only when the application is shut down. This endless loop defines the behavior of a worker-hosted service that repeats a certain task. In our case, the task to repeat is the continuous extraction and processing of N items from the queue.

Our hosted process is defined in the ProcessPurchases.cs file placed in the HostedServices folder:

using GrpcMicroServiceStore;
using GrpcMicroServiceStore.Models;
namespace GrpcMicroService.HostedServices;
public class ProcessPurchases : BackgroundService
{
    IServiceProvider services;
    public ProcessPurchases(IServiceProvider services)
    {
        this.services = services;
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        bool queueEmpty = false;
        while (!stoppingToken.IsCancellationRequested)
        {
           while (!queueEmpty && !stoppingToken.IsCancellationRequested)
           {
             ...
           }
            await Task.Delay(100, stoppingToken);
            queueEmpty = false;
        }
    }
}

The class constructor is not injected with the specific services it needs, but instead has the IServiceProvider that can be used to get any service defined in the dependency injection engine. The reason for this choice is that it will launch several threads (one for each of the N messages extracted from the queue), and, in general, different threads can’t share the same service instance.

The problem is caused by services with a session scope. Usually, these services are not designed to be thread-safe since the single session-scoped instance used throughout a whole ASP.NET Core request is never shared among parallel threads. However, we are not going to use our services from within the usual ASP.NET Core pipeline but from within parallel threads launched by our hosted service. Therefore, we need a different session scope for each parallel thread. Thus, the right way to process is to use IServiceProvider to create each necessary scope and then use each scope to get a different instance for each parallel thread.

The inner while loop runs till the queue is empty, then the worker thread sleeps for 100 milliseconds and then tries the inner loop again to see if in the meantime some new message reached the queue.

When the application is shut down, the stoppingToken CancellationToken is signaled and both loops exit, so that the whole ExecuteAsync method exits and the worker thread dies.

Below is the content of the inner loop:

using (var scope = services.CreateScope())
{
    IMessageQueue queue = scope.ServiceProvider.GetRequiredService<IMessageQueue>();
                    
    var toProcess = await queue.Top(10);
    if (toProcess.Count > 0)
    {
        Task<QueueItem?>[] tasks = new Task<QueueItem?>[toProcess.Count];
        for (int i = 0; i < tasks.Length; i++)
        {
            var toExecute = ...
            tasks[i] = toExecute();
        }
        await Task.WhenAll(tasks);
        await queue.Dequeue(tasks.Select(m => m.Result)
           .Where(m => m != null).OfType<QueueItem>());
    }
    else queueEmpty = true;
}

A session scope surrounds the whole code since we need a unique instance of IMessageQueue to manipulate the queue.

The code tries to extract N messages from the queue. If no message is found, queueEmpty is set to true, so that the inner loop exits; otherwise, a for loop creates a separate task for each extracted request and inserts it in the tasks array.

Then, Task.WhenAll awaits all tasks. Finally, queue.Dequeue removes from the queue all not-null requests returned from the tasks. Since a not-null request is returned only in the case of successful processing, queue.Dequeue removes just the successfully processed requests.

Below is the definition of a task assigned to var toExecute:

var toExecute = async () =>
{
    using (var sc = services.CreateScope())
    {
        IDayStatistics statistics = sc.ServiceProvider.GetRequiredService<IDayStatistics>();
        return await statistics.Add(toProcess[i]);
    }
};

Each task creates a different session scope, so it can have a private copy of IDayStatistics, and then processes its request with statistics.Add.

That’s all! The next subsection describes a fake microservice that randomly creates purchases and passes them to the Counter gRPC service.

Testing the GrpcMicroservice project with a fake purchase requests generator

Let’s implement another microservice that feeds the previous microservice with randomly generated requests. The right project for a worker service that is not based on ASP.NET Core is the Worker Service project template. This project template automatically scaffolds a host containing a unique hosted service called Worker. We called this project FakeSource. In order to enable gRPC client usage, we must add the following NuGet packages: Google.Protobuf, Grpc.NET.Client, and Grpc.Tools.

Then, we must add the same counting.proto file as was added to the previous project. However, this time we must require client code generation by placing the code below in the FakeSource project file:

<ItemGroup>
    <Protobuf Include="..GrpcMicroServiceProtoscounting.proto" GrpcServices="Client">
      <Link>Protoscounting.proto</Link>
    </Protobuf>
 </ItemGroup>

The GrpcServices attribute set to Client is what enables client code generation instead of server code generation. The link tag appears since we added the same counting.proto file of the GrpcMicroService project as a link instead of copying it in the new project.

The hosted service is defined with the usual endless loop:

using Grpc.Net.Client;
using GrpcMicroService;
using Google.Protobuf.WellKnownTypes;
namespace FakeSource;
public class Worker : BackgroundService
{
    private readonly string[] locations = new string[] 
           { "Florence", "London", "New York", "Paris" };
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        Random random = new Random();
        while (!stoppingToken.IsCancellationRequested)
        {
          try
          {
             ...
             await Task.Delay(2000, stoppingToken);
          }
          catch (OperationCanceledException)
          {
             return;
          }
          catch { }
        }
    }
}

The locations array contains locations that will be randomly selected. As soon as the ExecuteAsync method starts, it creates the Random instance that will be used in all random generations.

Each loop is enclosed in a try/catch; if an OperationCanceledException is generated, the method exits, since a similar exception is created when the application is being shut down and the thread is killed. In the case of other exceptions, the code tries to recover by simply moving to the next loop. In an actual production application, the final catch should contain instructions to log the intercepted exception and/or instructions for a better recovery strategy. In the next example, we will see more sophisticated exception handling that is adequate for actual production applications.

Inside the try, the code creates a purchase message and sends it to the Counter service, and then sleeps for 2 seconds.

Below is the code that sends the requests:

var purchaseDay = new DateTimeOffset(DateTime.UtcNow.Date, TimeSpan.Zero);
//randomize a little bit purchase day
purchaseDay = purchaseDay.AddDays(random.Next(0, 3) - 1);  
//message time
var now = DateTimeOffset.UtcNow;
//add random location
var location = locations[random.Next(0, locations.Length)];
var messageId = Guid.NewGuid().ToString();
//add random cost
int cost = 200 * random.Next(1, 4);
//send message
using var channel = GrpcChannel.ForAddress("http://localhost:5000");
var client = new Counter.CounterClient(channel);
//since this is a fake random source
//in case of errors we simply do nothing.
//An actual client should use Polly
//to define retry policies
try
{
    await client.CountAsync(new CountingRequest
    {
        Id = messageId,
        Location = location,
        PurchaseTime = Timestamp.FromDateTimeOffset(purchaseDay),
        Time = Timestamp.FromDateTimeOffset(now),
        Cost = cost
    });
                    
}
catch {}

The code just prepares the message with random data, then it creates a communication channel for the gRPC server address and passes it to the constructor of the Counter service proxy. Finally, the Count method is called on the proxy. The call is enclosed in a try/catch, and in the case of an error, the error is simply ignored, since we are just sending random data. Instead, an actual production application should use Polly to retry the communication with predefined strategies. Polly was described in the Resilient task execution section of Chapter 5, Applying a Microservice Architecture to Your Enterprise Application. We will show you how to use Polly in the example in the next section.

Finished! Now it is time to test everything. Right-click on the solution and select Set Startup Projects, then set both FakeSource and GrpcMicroService to start. This way, both projects will be launched simultaneously when the solution is run.

Launch Visual Studio and then let both processes run for a couple of minutes, then go to SQL Server Object Explorer and look for a database called grpcmicroservice. If the SQL Server Object Explorer window is not available in the left menu of Visual Studio, go to the top Window menu and select it.

Once you have located the database, show the content of the DayTotals and Purchases tables. You should see all computed daily sums and all processed purchases.

You can also inspect what happens in the server project by opening the HostedServices/ProcessPurchases.cs file and placing breakpoints on the queue.Top(10) and await queue.Dequeue(...) instructions.

You can also move FakeSource into a different Visual Studio solution, so that you can simultaneously run several copies of FakeSource each in a different Visual Studio instance.

The full code is in the GrpcMicroService subfolder of the ch14 folder of the book’s GitHub repository. The next section shows you how to solve the same problem with queued communication using the RabbitMQ message broker.

Implementing microservices with .NET worker services and message brokers

This section explains the modifications needed to use a message broker instead of gRPC communication with an internal queue. This kind of solution is usually more difficult to test and design but allows for better horizontal scaling.

The message broker used in this example is RabbitMQ. However, we could also replace it with Azure Service Bus using the code available in the GitHub repository associated with the book. The next subsection explains how to install RabbitMQ on your development machine.

Installing RabbitMQ

Before installing RabbitMQ, you need to install Erlang from the link given in the Technical requirements section. Just download and execute the installer from an administrative account. After that, you can download and install RabbitMQ from the link in the Technical requirements section.

If installation is successful, you should find a service called RabbitMQ among your machine’s Windows services. If either you don’t find it or it is not running, restart your computer.

Administrative commands can be issued to RabbitMQ from the command prompt, which you can find in the RabbitMQ Server Windows menu folder.

You can also enable a web-based administrative UI. Let’s open the RabbitMQ command prompt and issue the following command:

rabbitmq-plugins enable rabbitmq_management

Then, browse to http://localhost:15672. You will be prompted for a username and password. Initially, they are both set to guest. From there, you may inspect all active connections and channels and all communication queues that have been created. The queues page contains all queues that have been defined. By clicking on each of them, you move to a queue-specific page where you can inspect the queue content and perform various operations on the specific queue.

The next subsection contains a short survey of RabbitMQ features.

RabbitMQ basics

RabbitMQ messages must be prepared in binary format, so we need to serialize .NET objects with a binary formatter before sending them. In the example in this section, we will test both the ProtoBuf serializer and a fast .NET specific serializer called Binaron. It might also be possible to use a JSON serializer to ensure better compatibility if there are compatibility issues among microservices implemented with different frameworks by different teams, and/or if there are legacy microservices.

Messages are not sent directly to queues, but to other entities called exchanges that route them to queues. By adequately defining the exchange routing strategy, we can implement several patterns. More specifically:

  • When we use a default exchange, the message is sent to a single queue and we can implement asynchronous direct calls.
  • When we use a fanout exchange, the exchange will send the messages to all queues that subscribe to that exchange. This way, we can implement the publisher/subscriber pattern.

Our example will describe just direct calls, but the Further reading section contains a link to RabbitMQ tutorials that show examples of publisher/subscriber implementations.

The next section explains how to modify the example in the previous section to use RabbitMQ-based direct communication.

Replacing internal queues with RabbitMQ

First of all, the ASP.NET Core project must be replaced by another Worker Service project. Also, this project must add the connection string to its configuration file and must call the AddStorage extension method to add all the database services to the dependency injection engine. Below is the full content of the Program.cs file:

using GrpcMicroService.HostedServices;
using GrpcMicroServiceStore;
IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices((hostContext, services) =>
    {
        services.AddStorage(hostContext.Configuration
            .GetConnectionString("DefaultConnection"));
        services.AddHostedService<ProcessPurchases>();
    })
    .Build();
await host.RunAsync();

We don’t need the gRPC services and service proxies anymore, just ProtoBuf for the binary messages, so both the FakeSource process and the GrpcMicroService projects have to add just the Google.Protobuf and Grpc.Tools NuGet packages. Both projects need the following messages.proto file, which defines just the purchase message:

syntax = "proto3";
option csharp_namespace = "GrpcMicroService";
import "google/protobuf/timestamp.proto"; 
package purchase;
message PurchaseMessage {
  string id = 1;
  google.protobuf.Timestamp time = 2;
  string location = 3;
  int32 cost =4; 
  google.protobuf.Timestamp purchaseTime = 5;
}

The automatic generation of the message classes is enabled in both projects with the same XML declaration in their project files:

<ItemGroup>
    <Protobuf Include="Protosmessages.proto" GrpcServices="Client" />
</ItemGroup> 

Both projects need to specify Client code generation since no service needs to be created.

In order to communicate with the RabbitMQ server, both projects must add the RabbitMQ.Client NuGet package.

Finally, FakeSource also adds the Polly NuGet package because we will use Polly to define reliable communication strategies.

The ExecuteAsync method of the client project is a little bit different:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    Random random = new Random();
    var factory = new ConnectionFactory() { HostName = "localhost" };
    IConnection? connection =null;
    IModel? channel = null;
    try
    {
        while (!stoppingToken.IsCancellationRequested)
        {
        ...       
        }
    }
    finally
    {
        if (connection != null)
        {
            channel.Dispose();
            connection.Dispose();
            channel = null;
            connection = null;
        }
    }
}

Communication requires the creation of a connection factory, then the creation factory generates a connection, and the connection generates a channel. The connection factory is created outside of the main loop since it can be reused several times and it is not invalidated by communication errors.

For the connection and channel, outside of the main loop, we just define the variables and where to place them, since they are invalidated in the case of communication exceptions, so we must dispose of them and recreate them from scratch after each exception.

The main loop is enclosed in try/finally to ensure that any channel/connection pair is disposed of before leaving the method.

Inside the main loop, as a first step, we create the purchase message:

var purchaseDay = DateTime.UtcNow.Date;
//randomize a little bit purchase day
purchaseDay = purchaseDay.AddDays(random.Next(0, 3) – 1);
var purchase = new PurchaseMessage
{
    //message time
    PurchaseTime = Timestamp.FromDateTime(purchaseDay),
    Time = Timestamp.FromDateTime(DateTime.UtcNow),
    Id = Guid.NewGuid().ToString(),
    //add random location
    Location = locations[random.Next(0, locations.Length)],
    //add random cost
    Cost = 200 * random.Next(1, 4)
};

Then, the message is serialized:

byte[]? body = null;
using (var stream = new MemoryStream())
{
    purchase.WriteTo(stream);
    stream.Flush();
    body = stream.ToArray();
}

Before executing the communication, we define a Polly policy:

var policy = Policy
    .Handle<Exception>()
    .WaitAndRetry(6,
        retryAttempt => TimeSpan.FromSeconds(Math.Pow(2,
        retryAttempt)));

The above policy is an exponential retry, which in the case of an exception waits for an exponentially growing amount of time. So, if six attempts are made, then the second attempt is made after 2 seconds, the third after 4 seconds, the fourth after 8 seconds, and so on. If all attempts fail, the exception is rethrown and causes the message to be lost. If it’s important that messages can’t be lost, we can combine this strategy with a circuit break strategy (see Resilient task execution in Chapter 5, Applying a Microservice Architecture to Your Enterprise Application).

Once we have defined the retry policy, we can execute all the communication steps in the context of this policy:

policy.Execute(() =>
{
try
{
    if(connection == null || channel == null)
    {
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        channel.ConfirmSelect();
    }
    //actual communication here
    ...
    ...
}
catch
{
    channel.Dispose();
    connection.Dispose();
    channel = null;
    connection = null;
    throw;
}

If there are no valid connections or channels, they are created. channel.ConfirmSelect() declares that we need confirmation that the message was safely received and stored on disk. In the case that an exception is thrown, both the channel and the connection are disposed of, since they might have been corrupted by the exception. This way, the next communication attempt will use a fresh communication and a new channel. After the disposal, the exception is rethrown so it can be handled by Polly policy.

Finally, below are the actual communication steps:

  1. First of all, if the queue doesn’t already exist, it is created. The queue is created as durable; that is, it must be stored on disk and not be exclusive, so that several servers can extract messages from the queue in parallel:
    channel.QueueDeclare(queue: "purchase_queue",
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: null);
    
  2. Then, each message is declared as persistent; that is, it must be stored on disk:
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    
  3. Finally, the message is sent through the default exchange, which sends it to a specific named queue:
    channel.BasicPublish(exchange: "",
            routingKey: "purchase_queue",
            basicProperties: properties,
            body: body);
    
  4. As a final step, we wait until the message is safely stored on disk:
    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
    

If a confirmation doesn’t arrive within the specified timeout, an exception is thrown that triggers the Polly retry policy. When messages are taken from a local database queue, we can also use a non-blocking confirmation that triggers the removal of the message from the local queue.

The ExecuteAsync method of the server-hosted process is defined in the HostedServices/ProcessPurchase.cs file:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        try
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "purchase_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += async (sender, ea) =>
                {
                    // Message received even handler
                    ...
                };
                channel.BasicConsume(queue: "purchase_queue",
                            autoAck: false,
                            consumer: consumer);
                 await Task.Delay(1000, stoppingToken);
            }
         }
        catch { }
    }
}

Inside the main loop, if an exception is thrown, it is intercepted by the empty catch. Since the two using statements are left, both the connection and channel are disposed of. Therefore, after the exception, a new loop is executed that creates a new fresh connection and a new channel.

In the using statement body, we ensure that our queue exists, and then set prefetch to 1. This means that each server must extract just one message at a time, which ensures a fair distribution of the load among all servers. However, setting prefetch to 1 might not be convenient when servers are based on several parallel threads since it sacrifices thread usage optimization in favor of fair distribution among servers. As a consequence, threads that could successfully process further messages (after the first) might remain idle.

Then, we define a message received event handler. BasicConsume starts the actual message reception. With autoAck set to false, when a message is read from the queue, it is not removed but just blocked, so it is not available to other servers that read from the same queue. The message is actually removed when a confirmation that it has been successfully processed is sent to RabbitMQ. We can also send a failure confirmation, in which case the message is unblocked and becomes available for processing again.

If no confirmation is received, the message remains blocked till the connection and channel are disposed of.

BasicConsume is non-blocking, so the Task.Delay after it blocks till the cancelation token is signaled. In any case, after 1 second Task.Delay unblocks and both the connection and the channel are replaced with fresh ones. This avoids that non-confirmed messages might remain blocked forever.

Let’s move on to the code inside the ‘message received’ event. This is the place where the actual message processing takes place.

As a first step, the code verifies if the application is being shut down, in which case it disposes of the channel and connection and returns without performing any further operations:

if (stoppingToken.IsCancellationRequested)
{
    channel.Close();
    connection.Close();
    return;
}

Then, a session scope is created to access all session-scoped dependency injection services:

using (var scope = services.CreateScope())
{
  try
  {
  // actual message processing
  ...
  }
  catch {
    ((EventingBasicConsumer)sender).Model.BasicNack(ea.DeliveryTag, false, true);
  }
}

In the case that an exception is thrown during the message processing, a Nack message is sent to RabbitMQ to inform it that the message processing failed. ea.DeliveryTag is a tag that uniquely identifies the message. The second argument set to false informs RabbitMQ that the Nack is just for the message identified by ea.DeliveryTag that doesn’t also involve all other messages waiting for confirmation from this server. Finally, the last argument set to true asks RabbitMQ to requeue the message whose processing failed.

Inside the try block, we get an instance of IDayStatistics:

IDayStatistics statistics = scope.ServiceProvider
    .GetRequiredService<IDayStatistics>();

Then, we deserialize the message body to get a PurchaseMessage instance and add it to the database:

var body = ea.Body.ToArray();
PurchaseMessage? message = null;
using (var stream = new MemoryStream(body))
{
    message = PurchaseMessage.Parser.ParseFrom(stream);
}
var res = await statistics.Add(new Purchase { 
    Cost= message.Cost,
    Id= Guid.Parse(message.Id),
    Location = message.Location,
    Time = new DateTimeOffset(message.Time.ToDateTime(), TimeSpan.Zero),
    PurchaseTime = new DateTimeOffset(message.PurchaseTime.ToDateTime(), TimeSpan.Zero)
});

If the operation fails, the Add operation returns null, so we must send a Nack; otherwise, we must send an Ack:

if(res != null)
    ((EventingBasicConsumer)sender).Model
        .BasicAck(ea.DeliveryTag, false);
else
    ((EventingBasicConsumer)sender).Model
        .BasicNack(ea.DeliveryTag, false, true);

That’s all! The full code is in the GrpcMicroServiceRabbitProto subfolder of the ch14 folder in the GitHub repository of this book. You can test the code by setting both the client and server projects as the start project and running the solution. After 1-2 minutes, the database should be populated with new purchases and new daily totals. In a staging/production environment, you can run several copies of both the client and server.

The GrpcMicroServiceRabbit subfolder in the ch14 folder of the GitHub repository contains another version of the same application that uses the Binaron NuGet package for serialization. It is faster than ProtoBuf, but being .NET-specific, it is not interoperable. Moreover, it has no features to facilitate message versioning. It is useful when performance is critical and versioning and interoperability are not a priority.

The Binaron version differs in that it has no .proto files or other ProtoBuf stuff, but it explicitly defines a PurchaseMessage .NET class. Moreover, ProtoBuf serialization and deserialization instructions are replaced by:

byte[]? body = null;
using (var stream = new MemoryStream())
{
    BinaronConvert.Serialize(purchase, stream);
    stream.Flush();
    body = stream.ToArray();
}

Together with:

PurchaseMessage? message = null;
using (var stream = new MemoryStream(body))
{
    message = BinaronConvert.Deserialize<PurchaseMessage>(stream);
}

That’s all for this chapter! In the next chapter, Presenting ASP.NET Core MVC, we will explore an HTML-based technique for implementing presentation layers on the server side.

Summary

In this chapter, we analyzed various options for efficient internal microservices communication. We explained the importance of a binary serialization that is interoperable and that ensures compatibility with previous message versions, and we described in detail ProtoBuf.

We analyzed the limits of RPC communication and why data-driven communication must be preferred. Then, we focused on how to achieve reliable asynchronous communication and efficient distributed transactions.

After having described the conceptual problems and techniques of reliable asynchronous communication, we looked at two architectures. The first one was based on gRPC, ASP.NET Core, and internal queues, and the second one was based on message brokers like RabbitMQ and .NET worker services.

The chapter explained, using practical examples, how to implement all the communication protocols that have been discussed and the architectural options for implementing worker microservices that are available in .NET.

Questions

  1. Why are queues so important in microservices communication?
  2. How do we recall another .proto file?
  3. How can we represent a TimeSpan in the ProtoBuf language?
  4. What are the advantages of ProtoBuf and gRPC over other binary options?
  5. What are the advantages of using message brokers instead of internal queues?
  6. Why is it acceptable to use a blocking gRPC call to enqueue a message in a recipient queue?
  7. How do we enable .proto file code generation in a .NET project file?
  8. How to send a message on a RabbitMQ channel with the official .NET client?
  9. How to wait that a message sent on a RabbitMQ channel is safely saved on disk with the official .NET client?

Further reading

The official .NET documentation on ProtoBuf can be found here: https://docs.microsoft.com/it-it/aspnet/core/grpc/protobuf?view=aspnetcore-6.0.

The .NET documentation on gRPC is here: https://docs.microsoft.com/it-it/aspnet/core/grpc/?view=aspnetcore-6.0.

Finally, the official Google documentation on the whole ProtoBuf language is here: https://developers.google.com/protocol-buffers/docs/proto3.

Complete tutorials on RabbitMQ can be found here: https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html.

The complete documentation for RabbitMQ is here: https://www.rabbitmq.com/.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.11.28