Chapter 4. Design Principles of Reactive Systems

In Chapter 3, we looked at the challenges behind distributed systems. It’s now time to see what Reactive has to offer. Reactive can be seen as a set of principles build distributed systems, a kind of checklist to verify that no major known concern was overlooked while architecting and building a system. These principles focus on being:

  • responsive - capable of handling requests when facing failures or peaks of load,

  • efficient - able to do more with fewer resources.

In this chapter, we are going to cover the principles promoted by reactive systems.

Reactive Systems 101

In 2013, a group of distributed systems experts gathered and wrote the first version of the reactive manifesto. They assembled in this white paper their experience building distributed systems and Cloud applications. While in 2013, the Cloud was not precisely what it is today, the dynamic creation of ephemeral resources was already a well-known mechanism.

The Reactive Manifesto defines reactive systems as distributed systems having four characteristics:

  1. Being responsive - the ability to handle requests in a timely fashion;

  2. Being resilient - the ability to manage failures gracefully;

  3. Being elastic - the ability to scale up and down according to the load and resources;

  4. Being message-driven - the usage of asynchronous message-based communication between the different components forming the system.

Reactive Systems Characteristics
Figure 4-1. Reactive Systems Characteristics

These four characteristics are represented in Figure 4-1. If you see this picture for the first time, you may be confused by all the arrows. It can look like a well-tailored marketing campaign. It’s not, and let’s explain why these pillars make a lot of sense when building Cloud-Native and Kubernetes-Native applications. For once, let’s start with the bottom of the figure.

Instead of trying to make distributed systems simpler than they are, reactive systems embrace their asynchronous nature. They use asynchronous message-passing to establish the communication tissue between the components. Asynchronous message passing ensures loose-coupling, isolation, and location transparency. In a reactive system, interactions rely on messages sent to abstract destinations. These messages carry everything, data as well as failures. Asynchronous message passing also improves resource utilization. Employing non-blocking communication (we will cover that part later in this chapter) allows idle components to consume almost no CPU and memory. Asynchronous message passing enables elasticity and resilience as depicted by the two bottom arrows from Figure 4-1.

Elasticity means that the system can adapt itself, or parts of itself to handle the fluctuating load. By looking at the message flowing between the components, a system can determine which parts reach their limits and create more instances or route the messages elsewhere. Cloud infrastructure enables creating these instances quickly at runtime. But elasticity is not only about scaling up, it’s also about scaling down. The system can decide to scale down underused parts to save resources. At runtime, the system adjusts itself, always meeting the current demand, avoiding bottlenecks, overflows, and over-committed resources. As you can imagine, elasticity requires observability and replication, and routing features. Observability will be covered in Chapter 13. In general, the last two are provided by the infrastructure such as Kubernetes or Cloud providers.

Resilience means handling failure gracefully. As explained in Chapter 3, failures are inevitable in distributed systems. Instead of hiding them, reactive systems consider failure as first-class citizens. The system should be able to handle them, and react to them. Failures are contained within each component, isolating components from each other. It ensures that parts of the system can fail and recover without jeopardizing the whole system. For instance, by replicating components (elasticity), the system can continue to handle the incoming messages even if some elements are failing. Implementing resilience is shared between the application, that need to be aware of failure, contain them, and if possible handle them gracefully; and the infrastructure that monitors the systems and restarts fallen components.

The last characteristic is the whole purpose of reactive systems: being responsive. Your system needs to stay responsive, i.e. responds in a timely fashion, even under fluctuating load (elasticity) and when facing failure (resilience). Relying on message-passing enables these characteristics and much more such as flow control by monitoring the messages in the system and applying back-pressure when necessary.

In a nutshell, reactive systems are exactly what we want to build: distributed systems able to handle the uncertainty, failures and the load efficiently. Their characteristics meet the requirement for Cloud-Native and Kubernetes-Native applications perfectly. But don’t be mistaken; building a reactive system is still making a distributed system. It’s challenging. However, by following these principles, the resulting system will be more responsive, more robust, and more efficient. The rest of this book details how we can easily implement such systems with Quarkus and messaging technologies.

Commands and Events

Now that we’ve covered many of the foundational principles, you might be confused. Earlier, in Chapter 1, we said that being Reactive is related to event-driven, but in the previous section, we explicitly mentioned asynchronous message passing. Does that mean the same thing? Not totally.

But first, we need to discuss the differences between commands and events. As complicated as a distributed system design can be, the concepts of commands and events are fundamental. Nearly all interactions between individual components involve one or the other.

Commands

Every system issues commands. Commands are actions that a user wishes to perform. Most HTTP-based APIs are passing commands: the client asks for an action to happen. It’s important to understand that the action has not yet happened. It may happen in the future, or not, it may complete successfully or fail. In general commands are sent to a specific recipient, and a result is sent back to the client.

Take the simple HTTP application we used in Chapter 3. You emitted a simple HTTP request. As said above, that was a command. The application receives that command, handles it, and produces a result.

Events

Events are actions that have successfully completed. An event represents a fact, something that happened: a keystroke, a failure, an order, anything important to the organization or system at hand that happened. An event can be the result of work done by a command.

Let’s go back to the HTTP request example from above. Once the response has been written, it had become an event. We have seen an HTTP request and its response. That event can be written in a log, or broadcast to interested parties can be aware of what happened.

Events are immutable. You cannot delete an event. Admittedly, you can’t change the past. If you want to refute a previously sent fact, you need to fire another event invalidating the fact. The carried facts are only made irrelevant by another fact for the current knowledge.

Messages

But, how to publish these events? There are many ways to publish events. These days, solutions like Apache Kafka or Apache ActiveMQ (we will cover both in Chapter 11) are very popular. They act as brokers between the producers and consumers. Essentially, our events are written into topics or queues. To write these events, the application sends a message to the broker targeting a specific destination (the queue or the topic).

A message is a self-contained data structure describing the event and any relevant details about the event such as who emitted it, at what time, potentially a unique ID. It’s generally better to keep the event itself business-centric and use additional metadata for the technical aspects.

On the other side, to consume events, you subscribe to the queue or topic containing the events you are interested in, and receive the messages. You unwrap the event and can also get the associated metadata (like when did the event happened, where did it happen…). The processing of an event can lead to the publication to other events (again packaged in messages and sent to a known destination), or to the execution of commands.

Brokers and messages can also convey commands. In this case the message contains the description of the action to execute, and another message (potentially multiple of them) would carry the outcome if needed.

Commands vs. Events: an example

Let’s take a look at an example to highlight the differences between commands and events. Imagine an e-commerce shop, as the one depicted on Figure 4-2. The user picks a set of products and finalizes the order (process to payment, get the delivery date, etc.).

Simplified architecture of an e-commerce shop
Figure 4-2. Simplified architecture of an e-commerce shop

The user sends a command, using an HTTP request for example, to the shop service with the items he wishes to receive. In a traditional application, once the ShopService receives the command, it would call an OrderService and invoke an order method with the username, the list of items (basket), and so on. Calling the order method is a command. That makes the ShopService dependant of the OrderService and reduce the component autonomy: the ShopService cannot operate without the OrderService. We are creating a distributed monolith1, i.e. a distributed application that would collapse as soon as one of its parts fails.

Let’s see the difference if, instead of using a command between the ShopService and OrderService, we publish an event. Once the user finalizes the order, it still sends a command to the shop service. However, this time, the shop service transforms that command into an event: a new order has been placed. The event contains the user, the basket, and so on. It’s a fact written in a log, or wrapped into a message and sent to a broker.

On the other side, the OrderService observes the a new order has been placed events, by reading where these events are stored. When the shop service emits the event, it receives it and can process it.

With this architecture, the ShopService does not depend on the OrderService. In addition, the OrderService does not depend on the ShopService, and it would process any observed event, regardless the emitter. For example, a mobile application can emit the same event when the user validates an order from a mobile phone.

Multiple components can consume events (Figure 4-3). For example, in addition to the OrderService, the StatisticsService keeps track of the most ordered items. It consumes the same event, without having to modify the ShopService to receive them.

A component observing events can derive new ones from them. For instance, the StatisticsService could analyze the order and compute recommendations. These recommendations could be seen as another fact, and so communicate as an event. The ShopService could observe these events and process them to influence items selection. However, the StatisticsService and the ShopService are independent of each other. The knowledge is cumulative and occurs by receiving new events and deriving, as done by the StatisticsService, new facts from the received events.

Architecture of the e-commerce shop using events and message brokers
Figure 4-3. Architecture of the e-commerce shop with events and message queues

As depicted on the image in Figure 4-3, we can use message queues to transport our event. These events are wrapped into messages, sent to known destination (orders and recommendations). The OrderService and StatisticService consume and process the messages independently.

It’s important for these destinations persist the events as an ordered sequence. By keeping that sequence, the system can go back in time, and reprocess the events. Such replay mechanism, very popular in the Kafka world, has multiple benefits. You can restart with a clean state after a disaster by reprocessing all the stored events. Then, if for example, we change the recommendation algorithm from the statistic service, it would be able to re-accumulate all the knowledge and derive new recommendations.

While in this example, the event emission sounds explicit, it’s not always the case. For example, events can be created from database writes2.

Commands and events are the basis of most of the interactions. While we use mostly commands, events come with significant benefits. Events are facts. Events tell a story, the story of your system, a narrative that describes your system’s evolution.

Overview of a reactive system
Figure 4-4. Overview of a reactive system

In reactive systems, events are wrapped into messages, and these messages are sent to destination, transported by message broker such as AMQP or Kafka (Figure 4-4). Such an approach solves two important architectural issues arising from the distributed systems. First, it naturally handles real-world asynchronicity. Second, it binds together services without relying on strong-coupling. At the edge of the system, it uses commands most of the time, often relying on HTTP.

This asynchronous message passing aspect of reactive systems forms the communication tissue. It not only grants the applications forming the system more autonomy and independence, but it also enables resilience and elasticity. You may wonder how? You will get the beginning of the response in the next section.

Destinations and Space Decoupling

The reactive applications, forming a reactive system, communicate using messages. They subscribe to destinations, and receives the messages sent by other component to these destinations. These messages can carry both commands or events, while as described in the previous section, events provide interesting benefits.

These destinations are not bound to specific components or instances. They are virtual. Components must only know the name (generally business-related, such as orders) of the destination, not who’s producing or consuming. It enables location transparency.

If you are using Kubernetes, you may consider location transparency as already managed for you. Indeed, you can use Kubernetes services to implement location transparency. You have a single endpoint delegating to a group of selected pods. But it’s somewhat limited and often tied to HTTP or request/reply protocols. Other environments can use service discovery infrastructure such as Hashicorp Consul, or Netflix Eureka.

Using messages sent to a destination allows you, as the sender, to ignore who precisely is going to receive the message. You don’t know if someone is currently available or if multiple components or instances are waiting for your message. This number of consumers can evolve at runtime; more instances can be created, moved, destroyed, new components deployed. But, for you, as a sender, you don’t need to know. You just used a specified destination.

Let’s illustrate the advantages of this addressability using the example from the previous section. The ShopService emits order placed events carried inside of messages sent to the orders destination (Figure 4-3).

It is likely possible that on a quiet period, only a single instance of the OrderService runs. If there are not many orders, why bother having more? We could even imagine having no instance, and instantiating one when we receive an order. Serverless platforms are offering this scale from zero ability.

However, over time, your shop gets more customers, and a single instance may not be enough. Thanks to the location transparency, we can start other instances of the OrderService to share the load (Figure 4-5). The ShopService is not modified and ignores this new topology.

Elasticity provided by the usage of message passing
Figure 4-5. Elasticity provided by the usage of message passing

How the load is shared among the consumer is also irrelevant for the sender. It can be a round-robin, a load-based selection, or something more clever. When the load returns to normal, the system can reduce the number of instances and save resources. Note that this kind of elasticity works perfectly for stateless services. For stateful services, it may be harder as the instances may have to share the state. However, there are solutions (not without caveats), like Kubernetes Stateful Sets or in-memory data grid, to coordinate state between instances of the same service.

Message-passing also enables replication. Following the same principle, we can shadow the active OrderService instance and take over if the primary instance fails (Figure 4-6). It avoids service disruption. That kind of fail-over may also require state sharing.

Resilience provided by the usage of message passing
Figure 4-6. Resilience provided by the usage of message passing

By using message passing, our system not only becomes asynchronous; it also becomes elastic and resilient. When you architect your system, you list the destination that implement the communication pattern you want. In general, you would use one destination per type of event, but it’s not necessarily the case. However, avoid at all costs having a destination per component instance. It introduces coupling between the sender and the receiver, discarding the benefits. It also reduces the extensibility. Finally, it’s primordial to keep the set of destination stable. Changing a destination would break components using it, or force you to handle redirections.

Time-Decoupling

Location transparency is not the only benefit. It also enables time decoupling.

Modern message backbones, such as AMQP 1.0, Apache Kafka, or even Java Message Service (JMS), enable it. With these event brokers, events are not lost if there are no consumers. They are stored and delivered later. Each broker has its own way. For instance, AMQP 1.0 uses persistent messages and durable subscribers to ensure message delivery. Kafka stores records in a durable, fault-tolerant, ordered log. The records can be retrieved so long as they remain stored within the topic.

If our ShopService emits the finalized orders as events, it does not need to know if the OrderService is available. It knows that it’s going to be processed eventually. If, for example, no instances of the OrderService are available when the ShopService emits the event, it’s not lost. When an instance gets ready, it receives the pending orders and process them. The user is then notified asynchronously with an email.

Of course, the message broker must be available and reachable. Most message brokers have replication abilities avoiding unavailability issue and message loss.

Note

It is becoming common to store events in an event log. Such ordered and append-only structure represent the full history of your system. Every time the state changes, it append the new state to the log.

Time decoupling increases the independence of our components. Time decoupling, combined with the other features enabled by asynchronous message passing, achieves a high level of independence between our components and keep the coupling to the minimum.

The role of non-blocking Input/Output (I/O)

At this point, you may wonder what’s the difference between an application using Kafka or AMQP and a reactive system. Message passing is the essence of reactive systems, and most of them relies on some sort of message brokers. It enables resilience and elasticity, which leads to responsiveness. It promotes space and time decoupling, making our system much more robust.

But, reactive systems are not only system exchanging messages. Sending and receiving messages must be done efficiently. To achieve this, Reactive promotes the usage of non-blocking I/Os.

Blocking Network I/O, threads and concurrency

To understand the benefits of non-blocking I/O, we need to know how blocking I/Os work. Let’s use a client-server interaction to illustrate this. When a client sends a request to a server, the server processes it and sends back a response. HTTP, for instance, follows this principle. For this to happen, both the client and the server need to establish a connection before the interaction starts. We will not go into the depths of the 7-layers model and the protocol stack involved in this interaction; you can find plenty of articles online about that topic.

Note

Examples from this section can be run directly from your IDE. Use chapter-4/non-blocking-io/src/main/java/org/acme/client/EchoClient.java to invoke the started server. Be sure to not run multiple servers concurrently as they all use the same port (9999).

So, to establish that connection between the client and the server, we use sockets:

Example 4-1. A Single-Threaded Echo Server using Blocking I/O (chapter-4/non-blocking-io/src/main/java/org/acme/blocking/BlockingEchoServer.java)
int port = 9999;

// Create a server socket
try (ServerSocket server = new ServerSocket(port)) {
    while (true) {

        // Wait for the next connection from a client
        Socket client = server.accept();

        PrintWriter response = new PrintWriter(client.getOutputStream(), true);
        BufferedReader request = new BufferedReader(
                new InputStreamReader(client.getInputStream()));

        String line;
        while ((line = request.readLine()) != null) {
            System.out.println("Server received message from client: " + line);
            // Echo the request
            response.println(line);

            // Add a way to stop the application.
            if ("done".equalsIgnoreCase(line)) {
                break;
            }
        }
        client.close();
    }
}

The client and the server have to bind themselves to a socket forming the connection. The server listens to its socket for the client to connect. Once established, the client and the server can both write and read data from the socket bound to that connection.

Traditionally, because it’s simpler, applications are developed using a synchronous development model. Such a development model executes instructions sequentially, one after the other. So, when such applications interact across the network, they expect to continue using a synchronous development model even for I/O. It uses synchronous communication and blocks the execution until the operation completes. For example, in the previous code snippet (Example 4-1), we wait for a connection and handle it synchronously. We read and write using synchronous APIs. It’s simpler, but it leads to the usage of blocking I/O.

With the blocking I/O, when the client sends a request to the server, the socket processing that connection and the corresponding thread that reads from it is blocked until some read data appears. The bytes are accumulated in the network buffer until everything is read and ready for processing. Until the operation is complete, the server can do nothing more but wait.

The consequence of this model is that we cannot serve more than one connection within a single thread. When the server receives a connection, it uses that thread to read the request, process it, and write the response. That thread is blocked until the last byte of the response is written on the wire. A single client connection blocks the server! Not very efficient, right?

So, to execute concurrent requests with this approach, the only way is to have multiple threads. So we need to allocate a new thread for each client connection To handle more clients, you need more threads and process each request on different worker thread:

Example 4-2. Principles behind multi-threaded server using blocking I/O
while (listening) {
    accept a connection;
    create a worker thread to process the client request;
}

To implement this principle, we need a thread pool (worker pool). When the client connects, we accept the connection and off-load the processing to a separate thread. Thus, the server thread can still accept other connections:

Example 4-3. A Multi-Threaded Echo Server using Blocking I/O (chapter-4/non-blocking-io/src/main/java/org/acme/blocking/BlockingWithWorkerEchoServer.java)
int port = 9999;
ExecutorService executors = Executors.newFixedThreadPool(10); 1

// Create a server socket
try (ServerSocket server = new ServerSocket(port)) {
    while (true) {

        // Wait for the next connection from a client
        Socket client = server.accept();

        executors.submit(() -> {                                    2
            try {
                PrintWriter response = new PrintWriter(client.getOutputStream(), true);
                BufferedReader request = new BufferedReader(
                        new InputStreamReader(client.getInputStream()));

                String line;
                while ((line = request.readLine()) != null) {
                    System.out.println(Thread.currentThread().getName() +
                            " - Server received message from client: " + line);
                    // Echo the request
                    response.println(line);

                    // Add a way to stop the application.
                    if ("done".equalsIgnoreCase(line)) {
                        break;
                    }
                }
                client.close();
            } catch (Exception e) {
                System.err.println("Couldn't serve I/O: " + e.toString());

            }
        });
    }
}
1

Create a worker thread pool to handle the request

2

Offload the processing of the request to a thread from the thread pool. The rest of the code is unchanged.

That’s the model used, by default, in traditional Java frameworks such as Jakarta or Spring3. Even if under the hood, these frameworks may use non-blocking I/O, it uses worker threads to handle the requests. But, there are many drawbacks to this approach:

  1. Each thread requires a stack of memory allocated to it. With the increasing number of connections, spawning multiple threads and switching between them will consume not only memory but also CPU cycles.

  2. At any given point in time, there can be multiple threads just waiting for the client requests. That’s a massive waste of resources.

  3. Your concurrency (the number of requests you can handle at a given time - 10 in the previous example) is limited by the number of threads you can create.

On public Clouds, it inflates your monthly bill, on private clouds, it reduces the deployment density. Therefore, the blocking I/O approach is not ideal if you have to handle many connections or implement applications dealing with a lot of I/O. In the realm of distributed systems, that’s often the case.

Luckily, there’s an alternative.

How non-blocking I/O work?

The alternative is non-blocking I/O. The difference is evident from its name. Instead of blocking waiting for the completion of the transmission, the caller is not blocked and can continue its processing. The magic happens in the operating system. With non-blocking I/O, the operating system queues the requests. The system processes the actual I/O in the future. When the I/O completes, and the response is ready, a continuation, often implemented as a callback, happens and the caller receives the result.

To better understand the benefits and see how these continuations work, we need to look under the hood: how are non-blocking I/O implemented. We already mentioned a queue. It enqueues I/O operations and returns immediately, so the caller is not blocked waiting for the I/O operations to complete. When a response comes back, the system stores the result in a structure. When the caller needs the result, it interrogates the system to see if the operation completed:

Example 4-4. An Echo Server using Non-Blocking I/O (chapter-4/non-blocking-io/src/main/java/org/acme/nio/NonBlockingServer.java)
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
Selector selector = Selector.open();
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);

channel.socket().bind(address);
// Server socket only support ACCEPT
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
    int available = selector.select(); // wait for events
    if (available == 0) {
        continue;  // Nothing ready yet.
    }

    // We got request ready to be processed.
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = keys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        if (key.isAcceptable()) {
            // --  New connection --
            SocketChannel client = channel.accept();
            client.configureBlocking(false);
            client.register(selector, SelectionKey.OP_READ);
            System.out.println("Client connection accepted: " + client.getLocalAddress());
        } else if (key.isReadable()) {
            // --  A client sent data ready to be read and we can write --
            SocketChannel client = (SocketChannel) key.channel();
            // Read the data assuming the size is sufficient for reading.
            ByteBuffer payload = ByteBuffer.allocate(256);
            int size = client.read(payload);
            if (size == -1 ) { // Handle disconnection
                System.out.println("Disconnection from " + client.getRemoteAddress());
                channel.close();
                key.cancel();
            } else {
                String result = new String(payload.array(), StandardCharsets.UTF_8).trim();
                System.out.println("Received message: " + result);
                if (result.equals("done")) {
                    client.close();
                }
                payload.rewind(); // Echo
                client.write(payload);
            }
        }
        // We sure we don't handle it twice.
        iterator.remove();
    }
}

Non-blocking I/O introduces a few new concepts:

  • we don’t use InputStream or OutputStream (which are blocking by nature), but Buffer. A Buffer is a temporary storage.

  • Channel’s can be viewed as an endpoint for an open connection.

  • Selector is the cornerstone of non-blocking I/O in Java.

A Selector manages multiple channels, either server or client channels. When you use non-blocking I/O, you create a Selector. Each time you deal with a new channel, you register this channel on the selector with the events your are interested in (accept, ready to read, ready to write).

Then, your code polls the Selector with only one thread to see if the channel is ready. When the channel is ready to read or write, you can start to read and write. We don’t need to have a thread for every channel at all, and a single thread can handle many channels.

The selector is an abstraction of the non-blocking I/O implementation provided by the underlying operating system. Various approaches depending on the operating systems are available.

First, select was implemented in the 1980s. It supports the registration of 1024 sockets. That was certainly enough in the 80’s, but not anymore.

poll is a replacement for select introduced in 1997. The most significant difference is that poll no longer limits the number of sockets. However, as select, the system only tells you how many channels are ready, but not which ones. So, you need to iterate over the set of channels to check which ones are ready. When there are few channels, it is not a big problem. Once the number of channels is more than hundreds of thousands, the iteration time is considerable.

Then, epoll appeared in 2002 in the Linux Kernel 2.5.44. Kqueue appeared in FreeBSD in 2000 and /dev/poll in Solaris around the same time. These mechanisms return the set of channels that are ready to be processed - no more iteration over every channel! Finally, windows systems provide IOCP, a very optimized implementation of select.

What’s important to remember is that regardless of how the operating systems implement it, with non-blocking I/O, you only need a single thread to handle multiple requests. It makes the model much more efficient than blocking I/O, as you don’t need to create threads to handle concurrent requests. Eliminating these extra threads makes your application much more efficient in terms of memory consumption (~ 1Mb per thread) and avoids wasting CPU cycles due to context switches (1-2 us per switch)4.

Reactive systems recommend the usage of non-blocking I/O to receive and send messages. Thus, your application can handle more messages with fewer resources. Another advantage is that an idle application would consume almost no memory or CPUs. You don’t have to reserve resources upfront.

Reactor pattern and event loop

Non-Blocking I/O gives us the possibility to handle multiple concurrent requests or messages with a single thread. How could we handle these concurrent requests? How do we structure our code when using non-blocking I/O? The examples given in the previous section are not scaling well; we can quickly see that implementing a REST API with such a model will be a nightmare. Besides, we would like to avoid using worker threads, as it would discard the advantages of non-blocking I/O. So we need something different: the reactor pattern.

The reactor pattern allows associating I/O events with event handlers. The reactor, the cornerstone of this mechanism, invokes the event handlers when the expected event is received.

The purpose of the reactor pattern is to avoid creating a thread for each message, request, and connection. It receives events from multiple channels and sequentially distributes them to the corresponding event handlers.

The reactor pattern
Figure 4-7. The reactor pattern

Implementation of the reactor pattern uses an event loop (Figure 4-7). It’s a thread iterating over the set of channels, and when there is some data ready to be consumed, invokes the associated event handler sequentially, in a single-threaded manner.

When you combine non-blocking I/O and the reactor patterns, you organize your code as a set of event handlers. That approach works wonderfully with reactive code as it exposes the notion of events, the essence of Reactive.

There are a few variants of the reactor pattern: * The multi-reactor pattern uses multiple event loops (generally 1 or 2 per CPU core), which increase the concurrency of the application. Multi-reactor pattern implementations, such as Eclipse Vert.x, call the event handlers in a single-threaded manner to avoid deadlock or state visibility issues. * The proactor pattern can be seen as an asynchronous version of the reactor pattern. With this proactor pattern, long-running event handlers invoke a continuation when they complete. Such mechanism allow mixing non-blocking and blocking (Figure 4-8).

the proactor pattern
Figure 4-8. The proactor pattern

You can integrate non-blocking event handlers as well as blocking ones, by offloading their execution to separated threads when it’s inevitable. When their execution completes, it invokes the continuation. As we will see in Chapter 6, this is the pattern used by Quarkus.

Anatomy of reactive applications

In the last few years, many frameworks popped up, offering reactive application support. Their goal is to simplify the implementation of reactive applications. They achieve this by providing higher-level primitives and APIs to handle events and abstract non-blocking I/O.

Indeed, and you may have recognized this already, using non-blocking I/O is not that simple. Combining this with a reactor pattern (or a variant) can be convoluted. Fortunately, alongside frameworks, libraries, and toolkits are doing the heavy lifting. Netty is an asynchronous event-driven network application framework leveraging non-blocking I/O to build highly concurrent applications. It’s the most used library to handle non-blocking I/O in the Java world. But, Netty can be challenging. The following snippet implements the echo TCP server using Netty:

Example 4-5. An Echo Server using Netty (chapter-4/non-blocking-io/src/main/java/org/acme/netty/NettyEchoServer.java)
public static void main(String[] args) throws Exception {
    new NettyServer(9999).run();
}

private final int port;

public NettyServer(int port) {
    this.port = port;
}

public void run() throws Exception {
    // NioEventLoopGroup is a multithreaded event loop that handles I/O operation.
    // The first one, often called 'boss', accepts an incoming connection.
    // The second one, often called 'worker', handles the traffic of the accepted
    // connection once the boss accepts the connection and registers the
    // accepted connection to the worker.
    EventLoopGroup bossGroup = new NioEventLoopGroup();

    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        // ServerBootstrap is a helper class that sets up a server.
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                // the NioServerSocketChannel class is used to instantiate a
                // new Channel to accept incoming connections.
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    // this handler is called for each accepted channel and
                    // allows customizing the processing. In this case, we
                    // just append the echo handler.
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new EchoServerHandler());
                    }
                });

        // Bind and start to accept incoming connections.
        ChannelFuture f = b.bind(port).sync();

        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}

private static class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // Write the received object, and flush
        ctx.writeAndFlush(msg);
    }
}

The Vert.x toolkit, based on top of Netty, provides higher-level features to build reactive applications such as HTTP clients and servers, messaging clients, etc. Typically, the same echo TCP server using Vert.x looks like:

Example 4-6. An Echo Server using Vert.x (chapter-4/non-blocking-io/src/main/java/org/acme/vertx/VertxEchoServer.java)
Vertx vertx = Vertx.vertx();
// Create a TCP server
vertx.createNetServer()
        // Invoke the given function for each connection
        .connectHandler(socket -> {
            // Just write the content back
            socket.handler(buffer -> socket.write(buffer));
        })
        .listen(9999);

Most Java framework offering Reactive capabilities are based on Netty or Vert.x. As shown in Figure 4-9, they all follow the same type of blueprint:

The common architecture of reactive frameworks
Figure 4-9. The common architecture of reactive frameworks

At the bottom, you have the non-blocking I/O. Generally, frameworks use Netty or Vert.x. This layer handles client connections, outbound requests, response writing, In other words, it manages the I/O part. Most of the time, it implements the reactor pattern (or a variant), and so provides an event-loop based model.

Then, in the second layer, you have the reactive framework per se. The role of this layer is to provide high-level APIs easy to use. To write your application code, you use these APIs. Instead of having to handle non-blocking I/O channels, it provides high-level objects such as HTTP requests, response, Kafka messages, and so on. Much easier!

Finally, in the top layer, you have your application. Your code does not need to touch non-blocking I/O concepts, thanks to the reactive framework. It can just focus on incoming events and handle them. Your code is just a collection of event handlers. It can use the features provided by the reactive framework to interact with other services or middlewares.

But, there is a catch. The event handler from your code is invoked using the event loop thread (so an I/O thread). If your code blocks this thread, no other concurrent events can be processed. It would be a disaster in terms of responsiveness and concurrency. The consequence of such a architecture is clear: your code must be non-blocking. It must never block the I/O threads, they are rare and are used to handle multiple concurrent requests. To achieve this, you could off-load the processing of some events to a worker thread (using the proactor pattern). While it can discard some of the benefits of non-blocking I/O, it is sometimes the most rational choice to do (Figure 4-10). Nevertheless, we should not abuse this as it would discard the reactive benefits and make the application slow. The multiple context switches required to handle an event on a worker thread penalizes the response time.

Running some event handlers on worker threads
Figure 4-10. Running some event handlers on worker threads

Typically, the applications we did in Chapter 2 and Chapter 3 rely on such a mechanism.

Another possibility is to only rely on non-blocking code, relying on asynchronous APIs provided by the reactive framework. These APIs would be non-blocking, and if it involved I/O, it uses non-blocking I/O. Every time an event handler executes an asynchronous operation, another handler (the continuation) is registered, and when the expected event arrives, the event loop invokes it. Thus, the processing is divided into smaller handlers running asynchronously. That model is the most efficient and embraces the concepts entirely behind Reactive.

Summary

Reactive systems are about building better distributed systems. They don’t aim to hide the nature of distributed systems but, on the contrary, embrace it.

In this chapter, we learned:

  • the four pillars of reactive systems (asynchronous messaging passing, elasticity, resilience and responsiveness);

  • how asynchronous messaging passing enables elasticity, resilience and increase the autonomy of each individual components

  • the role of commands and events in distributed system

  • how non-blocking I/O improve the resource utilization in reactive applications

But, this last point has a significant drawback, as we need to write non-blocking code. What a coincidence! The next chapter is precisely about that!

1 An interesting talk about distributed monoliths, and why you should avoid them, is available on https://www.microservices.com/talks/dont-build-a-distributed-monolith/

2 This pattern is called Change Data Capture (https://en.wikipedia.org/wiki/Change_data_capture). Frameworks such as Debezium (https://debezium.io/) are a key element of reactive systems when using databases, as the events are emitted without any impact on the application code.

3 Referring to the traditional Spring Framework. Reactive Spring is based on non-blocking I/O.

4 The article https://eli.thegreenplace.net/2018/measuring-context-switching-and-memory-overheads-for-linux-threads/ provides interesting data about the cost of thread on Linux.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.47.14