Chapter 3. Tools of the trade

The previous chapter explained why you need to be Reactive. Now we will turn our attention to the question of how you can achieve this goal. In this chapter, you will learn:

  • The earliest Reactive approaches
  • Important functional programming techniques
  • Strengths and weaknesses of the existing Reactive tools and libraries

3.1. Early Reactive solutions

Over the past 30 years, people have designed many tools and paradigms to help build Reactive applications. One of the oldest and most notable is the Erlang programming language (www.erlang.org), created by Joe Armstrong and his team at Ericsson in the mid-1980s. Erlang was the first language that brought Actors, described later in this chapter, into mainstream popularity.

Armstrong and his team faced a daunting challenge: to build a language that would support the creation of distributed applications that are nearly impervious to failure. Over time, Erlang evolved in the Ericsson laboratory, culminating with its use in the late 1990s to build the AXD 301 telephone switch, which reportedly achieved “nine nines” of uptime—availability 99.9999999% of the time. Consider exactly what that means. For a single application running on a single machine, that would be roughly 3 seconds of downtime in 100 years!

100 years
    * 365 days/year
    * 24 hours/day
    * 60 minutes/day
    * 60 seconds/minute
        = 3,153,600,000 seconds

3,153,600,000 seconds
    * 0.000000001 expected downtime
        = 3.1536 seconds of downtime in 100 years

Of course, such long-lasting, near-perfect uptime is purely theoretical; modern computers haven’t even been around 100 years. The study upon which this claim was based was performed by British Telecom in 2002–2003 and involved 14 nodes and a calculation based on 5 node-years of study.[1] Such approximations of downtime depend as much on the hardware as on the application itself, because unreliable computers put an upper limit on the availability of even the most resilient software. But such theoretical uptime illustrates the extraordinary fault tolerance possible in a Reactive application. Amazingly, no other language or platform has made claims comparable to Erlang’s.

1

Erlang employs a dynamic type system and ubiquitous pattern matching to capture the dynamic nature of Actor systems, and it copies message data for every message it passes between Actors. The data has to be copied as shown in figure 3.1 because there is no shared heap space between two Actor processes in the BEAM VM, the virtual machine on which Erlang runs. Data sent between Actors must be copied into the receiving Actor process’s heap prior to sending the message, to guarantee isolation of Actors and to prevent concurrent access to the data.

Figure 3.1. In Erlang, data in a sending Actor’s heap is transferred via a message to the heap of a receiving Actor.

Although these features provide additional safety, ensuring that any Erlang Actor can receive any message and no data can be shared, they lower the application’s potential throughput. On the other hand, garbage collection can be performed independently for all process heaps[2] and thus completes more quickly and with predictable latency.

2

One exception are binary strings longer than 64 bytes, which are stored in a separate heap and managed by reference counting.

All this copying would not be necessary if the Actors all shared the same heap. Then, two Actors could share a pointer to the same message. For this to work, though, one critical condition must be met: the data cannot be allowed to change. Functional programming addresses this challenge.

3.2. Functional programming

The concepts of functional programming have been around for a very long time, but only recently have they gained favor in mainstream programming languages. Why did functional programming disappear from the mainstream for so long, and why is its popularity surging now?

The period between 1995 and 2008 was essentially the Dark Ages of functional programming, as languages such as C, C++, and Java grew in usage, and the imperative, object-oriented programming style became the most popular way to write applications and solve problems. The advent of multiple cores opened up new opportunities for parallelization, but imperative programming constructs with side effects can be difficult to reason about in such an environment. Imagine a C or C++ developer who already has the burden of managing their own memory usage in a single-threaded application. In a multicore world, they must now manage memory across multiple threads while also trying to figure out who can access shared mutable memory at what time. This makes a job that was hard to do and verify in the first place into something that is daunting for even the most senior C/C++ developers.

This has led to a veritable Renaissance in functional programming. Many languages now include constructs from functional programming, because these better support reasoning about problems in concurrent and parallelized applications. Code written in a functional style makes it easier for developers to reason about what the application is doing at any given point in time.

The core concepts of functional programming have been around for many years: they were first defined in the lambda calculus by Alonzo Church in the 1930s.[3] The essence of functional programming is the insight that programs can be written in terms of pure mathematical functions: that is, functions that return the same value every time they are passed the same inputs and that cause no side effects. Writing code in functional programming is analogous to composing functions in mathematics. With tools for functional programming now at our disposal, it is truly a wonderful time to be a programmer again, because we can solve problems with languages that support the functional, the side-effecting, or both paradigms simultaneously.

3

Alonzo Church, “The Calculi of Lambda-Conversion,” Annals of Mathematical Studies 6, Princeton University Press (1941).

Next, we will examine some core concepts of functional programming that go beyond pure function composition: immutability, referential transparency, side effects, and functions as first-class citizens.

3.2.1. Immutability

A variable is said to have mutable state when it can refer to different values at different times. In an otherwise purely functional program, mutable state is called impure and is considered dangerous. Mutability is represented by any variable or field that is not stable or final and can be changed or updated while the application is running; some examples are shown in listing 3.1. When you use a final, immutable variable, you can reason more easily about what value it will have at a given time because you know that nothing can possibly change it after it is defined. This applies to data structures as well as to simple variables: any action performed on an immutable data structure results in the creation of a new data structure, which holds the result of the change. The original data structure remains unchanged. Therefore, any other part of the program that continues to use the original data structure does not see the change.

By using immutability throughout an application, you can limit the places where mutation is possible to a very small section of code. So, the possibility of contention, where multiple threads attempt to access the same resource at the same time and some are forced to wait their turn, is limited in scope to a small region. Contention is one of the biggest drains on performance in code that runs on multiple CPU cores; it should be avoided where possible.

The astute reader will notice the tension between funneling all changes through a single point and trying to avoid contention. The key to solving this apparent paradox is that having all code that performs mutation within a small scope makes contention easier to manage. With full oversight of the problem, you can tune the behavior: for example, by dividing a complex state representation into several mutable variables that can usually be updated independently—hence, without contention.

Listing 3.1. Unsafe, mutable message class, which may hide unexpected behavior

It is better to enforce immutability using the compiler rather than convention. This implies passing values to the constructor rather than calling setters, and using language features such as final in Java and val in Scala. Sometimes that is not possible, such as when an API requires an object to be created before all of its member values are known. In those situations, you may have to resort to initialization flags to prevent values from being set more than once or after the object is already in use.

Immutable data structures like the one shown in listing 3.2 ensure that the values returned by an object do not change. It does little good to ensure that the variable holding a Date is not reassigned if its content can be changed. The problem is not thread safety. The problem is that mutable state makes it far more difficult to reason about what the code is doing. There are several alternatives:

  • The first choice is to use an intrinsically immutable data structure. Some languages provide extensive libraries of immutable collection implementations, or you might incorporate a third-party library.
  • You can write a wrapper class to block access to the mutating methods. Ensure that no references remain to the backing mutable data structure once it is initialized. They can inadvertently defeat the purpose of the wrapper.
  • Copy-on-read semantics creates and returns a complete copy of the data structure every time it is read from the object. This ensures that readers do not have access to the original object, but it can be expensive. As with immutable wrappers, you must ensure that no outside references remain to the still-writable data structure within the object.
  • Copy-on-write semantics creates and returns a complete copy of the data structure whenever it is modified and ensures that users of the object cannot modify it through references they received from accessor methods. This prevents callers from changing the object’s underlying, mutable data structure, and it leaves previously acquired reader references unchanged.
  • The data structure can block use of the mutators once the data structure is initialized. This typically requires adding a flag to mark the data structure as read-only after it has been initialized.
Listing 3.2. Immutable message class that behaves predictably and is easier to reason about

Java does not make everything immutable by default, but liberal use of the final keyword is helpful. In Scala, case classes provide immutability by default as well as additional, very convenient features such as correct equality and hash-code functions:

3.2.2. Referential transparency

An expression is said to be referentially transparent if replacing it with a single value (a constant) has no impact on the execution of the program.[4] So, evaluating a referentially transparent expression—that is, performing an operation on some data—has no impact on that data, and no side effects can occur. For example, the act of adding, removing, or updating a value in an immutable list results in a new list being created with the changed values; any part of the program still observing the original list sees no change.

4

Consider Java’s java.lang.StringBuffer class. If you call the reverse method on a StringBuffer, you will get a reference to a StringBuffer with the values reversed. But the original StringBuffer reference refers to the same instance and therefore now also has changed its value:

This is an example of referential opacity: the value of the expression myStringBuffer.reverse() changes when it is evaluated. It cannot be replaced by its result without altering the way the program executes. The java.lang.StringBuilder class has the same problem.

Note, however, that a function call can be referentially transparent even if the function modifies internal state, as long as the function call can be replaced with its result without affecting the program’s output. For example, if an operation is likely to be performed multiple times, internally caching the result the first time can speed up execution without violating referential transparency. An example of this approach is shown in the next listing.

Listing 3.3. Referential transparency: allowing substitution of precomputed values

3.2.3. Side effects

Side effects are actions that break the referential transparency of later method calls by changing their environment such that the same inputs now lead to different results. Examples are modifying some system property, writing log output to the console, and sending data over a network. Pure functions have no side effects. Some functional programming languages such as Haskell enforce rules about where side effects can exist and when they can take place—a great help when reasoning about the correctness of code.

Side effects matter because they limit how an object can be used. Consider the following class.

Listing 3.4. Limiting usability with side effects
import java.io.Serializable;

public class SideEffecting implements Serializable, Cloneable {

    private int count;

    public SideEffecting(int start) {
        this.count = start;
    }

    public int next() {
        this.count += Math.incrementExact(this.count);
        return this.count;
    }
}

Every call to next() will return a different value. Consequently, the result of something like the example in listing 3.5 can give you a very unpleasant experience:

Even worse, something like new SideEffecting(Integer.MAX_VALUE 2) will cause the side effect after a few calls to become an ArithmeticException.

Sometimes side effects are more subtle. Suppose the object needs to be passed remotely. If it is immutable and without side effects, it can be serialized and reconstituted on the remote system, which then will have its own identical and unchanging copy. If there are side effects, the two copies will diverge from each other. This is especially problematic when the original system was not envisioned to have distributed operations. You may blithely assume that updates are being applied to the one and only instance of an object, without realizing the trouble that will cause when scalability requirements lead to copies of the object being kept on multiple servers.

3.2.4. Functions as first-class citizens

In a language where functions are first-class citizens, a function is a value just like an Integer or a String and can be passed as an argument to another function or method. The idea of doing so is to make code more composable: a function that takes a function as an argument can compose the calculation performed by its argument with calculations of its own, as illustrated by the call to .map in the following snippet:

Many languages that are otherwise not supportive of functional programming have functions as first-class citizens, including JavaScript and Java 8. In the previous example, the function passed as an argument is a lambda expression. It has no name and exists only within the context of its call site. Languages that support functions as first-class citizens also allow you to assign this function to a named variable as a function value and then refer to it from wherever you see fit. In Python, you could do this like so:

>>> def addOne(x):
...     return x + 1
...
>>> myFunction = addOne
>>> myFunction(3)
4

3.3. Responsiveness to users

Beyond functional programming, to build Reactive applications you also need to use tools that give you responsiveness. This is not responsive web design,[5] as it is known in the user-experience world, where a front end is said to be “responsive” if it resizes itself appropriately for the user’s viewing device. Responsiveness in Reactive applications means being able to quickly respond to user requests in the face of failure that can occur anywhere inside or outside the application. The performance trade-offs of Reactive applications are defined by the axiom that you can choose any two of the following three characteristics:

5

  • High throughput
  • Low latency, but also smooth and not jittery
  • Small footprint

These will all be defined further along in this section.

3.3.1. Prioritizing the three performance characteristics

When you make architecture choices for a Reactive application, you are in essence giving priority to two of those three characteristics and choosing to sacrifice the remaining characteristic where necessary. This is not a law or theorem, but more of a guiding principle that is likely to be true in most cases. To get a very fast application with smooth, low latency, you typically have to give it more resources (footprint). An example of this is a high-performance messaging library written in Java, known as the Disruptor (http://lmax-exchange.github.io/disruptor). To get its tremendous throughput and smooth latency, the Disruptor has to preallocate all the memory it will ever use for its internal ring buffer, in order to prevent allocations at runtime that could lead to stop-the-world garbage collections and compaction pauses in the Java virtual machine. The Disruptor gains throughput by pinning itself to a specific execution core, thereby avoiding the cost of context switches[6] between threads being swapped in and out on that core. This is another aspect of application footprint: there is now one fewer execution core available for use by other threads on that computer.

6

The Storm framework (https://github.com/nathanmarz/storm), created by Nathan Marz and released in 2011 to much acclaim, provides capabilities for distributed processing of streaming data. Storm was created at Marz’s startup, BackType, which was purchased by Twitter and became the basis for its real-time analytics applications. But the implementation was not particularly fast, because it was built using Clojure and pure functional programming constructs. When Marz released version 0.7 in 2012, he used the Disruptor to increase throughput by as much as three times, at the cost of footprint. This trade-off matters to those who choose to deploy an application that uses Storm, particularly in the cloud, where one core on the VM must be devoted solely to the Disruptor to maintain its speed. Note that the number of cores available to an application in a virtual environment is not an absolute value. As Doug Lea,[7] the author of many concurrency libraries on the JVM, such as the ForkJoinPool and CompletableFuture, has said, “Ask your hypervisor how much memory you have, or how many cores you have. It’ll lie. Every time. It’s designed to lie. You paid them money so it would lie.”[8] Developers have to take these variables into account when considering footprint in a cloud deployment.

7

8

Doug Lea, “Engineering Concurrent Library Components” (talk, Emerging Technologies Conference, 2013), http://chariotsolutions.com/screencast/phillyete-screencast-7-doug-lea-engineering-concurrent-library-components.

Some platforms have constraints that limit their ability to make these trade-offs. For example, a mobile application typically cannot give up footprint to increase throughput and make latency smoother, because a mobile platform typically has very limited resources. Imagine the cost of pinning a core on a mobile phone, both in terms of reduced resource availability for other applications and the phone’s operating system. A mobile phone has limited memory as well as constraints on power usage: you don’t want to drain the battery so quickly that no one would want to use your application. So, mobile applications typically attempt to minimize footprint while increasing throughput at the expense of latency, because users are more willing to accept latency on a mobile platform. Anyone who has used an application from a phone has experienced slowness due to network issues or packet loss.

3.4. Existing support for Reactive design

Now that we have reviewed the basic concepts needed to understand and evaluate tools and language constructs for implementing Reactive design, it is time to look at the leading examples. Many languages have innovated in the area of dividing work and making it asynchronous.

For each concept or implementation described in this section, we will provide an evaluation of how well it meets the tenets of the Reactive Manifesto. Note that although all are asynchronous and nonblocking in and of themselves, it is still up to you to ensure that the code you write remains asynchronous and nonblocking as well, in order to remain Reactive.

3.4.1. Green threads

Some languages do not include built-in constructs for running multiple threads within the same operating system process. In these cases, it is still possible to implement asynchronous behavior via green threads: threads scheduled by the user process rather than the operating system.[9]

9

Green threads can be very efficient but are restricted to a single physical machine. It is impossible, without the aid of delimited portable continuations,[10] to share the processing of a thread across multiple physical machines. Delimited portable continuations allow you to mark points in logic where the execution stack can be wrapped up and exported either locally or to another machine. These continuations can then be treated as functions. This is a powerful idea, implemented in a Scheme library called Termite (https://code.google.com/p/termite). But green threads and continuations do not provide for resilience, because there is currently no way to supervise their execution; they are therefore lacking with respect to fault tolerance.

10

Waldo et al. note[11] that it is not a good idea to try to make logic that executes in a distributed context appear to be local. This postulate applies to green threads as well. If we take “local” to mean local to the thread, and “distributed/remote” to mean on another thread and thus asynchronous, you would not want “distributed/remote” to appear “local” because this would obscure the difference between synchronous and asynchronous operations. It would be hard to tell which operations can block the program’s progress and which cannot!

11

Jim Waldo, Geoff Wyant, Ann Wollrath, Sam Kendall: “A Note on Distributed Computing,” Sun Microsystems Laboratories, 1994, http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.41.7628.

Reactive evaluation of green threads

Green threads are asynchronous and nonblocking, but they do not support message passing. They do not scale up to use multiple cores on a machine by themselves, although if the runtime supports it, it is possible to have more than one in a process, or multiple processes can be run. They do not scale outward across nodes in a network. They also do not provide any mechanisms for fault tolerance, so it is up to developers who use them to write their own constructs to handle any failure that may occur.

3.4.2. Event loops

When a language or platform does not support multiple threads in one process, you can still get asynchronous behavior by making green threads that share an event loop. This loop provides a mechanism for sharing a single execution thread among several logical threads at the same time. The idea is that although only a single thread can execute at a given moment, the application should not block on any operation and should instead make each thread yield until such time as the external work it needs, such as calling a data store, is completed. At that time, the application can invoke a callback to perform a predefined behavior. This is very powerful: Node.js (http://nodejs.org), for example, uses a single-threaded JavaScript runtime to perform considerably more work because it doesn’t have to wait for every operation to complete before handling other work.

Event loops are most typically implemented with callbacks. This would be okay if only a single callback could be referenced at a time, but as an application’s functionality grows, this typically is not the case. The terms callback hell and pyramid of doom have been coined to represent the interwoven spaghetti code that often results from popular tools like Node.js. Furthermore, event loops based on a single-threaded process are viable only for uses that are I/O-bound or when the use case is specific to handling input and output. Trying to use an event loop for CPU-bound operations will defeat the advantage of this approach.

Here is a simple example of a Node.js application. Note that running this server and using Google’s Chrome browser to send requests to the address 127.0.0.1:8888 may result in a doubling of the counter value on each request. Chrome has a known issue with sending an additional request for favicon.ico with every request:[12]

12

Reactive evaluation of event loops

The suitability of an event loop for a Reactive application depends on the implementation. As deployed via Node.js in JavaScript, event loops are similar to green threads in that they are asynchronous and nonblocking but do not support message passing. They do not scale up to use multiple cores on a machine by themselves, although if the runtime supports it, it is possible to have more than one in a process, or multiple processes can be run. They do not scale outward across nodes in a network. They also do not provide any mechanisms for fault tolerance, so it is up to developers to write their own constructs to handle any failure that may occur.

But there are alternative implementations, such as Vert.x (http://vertx.io), which runs on the JVM and has a feel similar to Node.js but supports multiple languages. Vert.x is a compelling solution because it provides a distributed approach to the event-loop model, using a distributed event bus to push messages between nodes. In a JVM deployment, it does not need to use green threads because it can use a pool of threads for multiple purposes. In this respect, Vert.x is asynchronous and nonblocking and does support message passing. It also scales up to use multiple cores, as well as scales out to use multiple nodes in a network. Vert.x does not have a supervision strategy for fault tolerance, but it is an excellent alternative to an event loop, particularly because it supports JavaScript just as Node.js does.

3.4.3. Communicating Sequential Processes

Communicating Sequential Processes (CSP) is a mathematical abstraction of multiple processes, or threads in a single process, that communicate via message passing.[13] You can define work to be performed concurrently in separate processes or threads, which then pass messages between them to share information.

13

C.A.R. Hoare, “Communicating Sequential Processes,” Communications of the ACM 21, no. 8 (1978): 666–677.

What makes CSP unique is that the two processes or threads do not have to know anything about one another, so they are nicely decoupled from the standpoint of sender and receiver but still coupled with respect to the value being passed. Rather than assuming that messages accumulate in queues until read, CSP uses rendezvous messaging: for a message to be passed, the sender and receiver must reach a point where the sender is ready to send and the receiver is ready to receive. Consequently, receipt of a message always synchronizes both processes. This is fundamentally different from Actors, which will be discussed in section 3.4.6. This also limits the two processes or threads in how distributed they can be, depending on how CSP is implemented. For example, CSP on the JVM as implemented by Clojure’s core.async library cannot be distributed across multiple JVM instances, even on the same machine. Neither can Go’s channels, also known as goroutines.

Because CSP is defined formally and mathematically, it is theoretically provable whether a deadlock can or cannot occur inside of it, via a method called process analysis. Being able to statically verify the correctness of concurrency logic is a powerful idea. Note, however, that neither Clojure’s core.async nor Go’s channels have this capability; but if it is practical to implement, it would be very useful.

Because no process or thread in a CSP-based application has to know about another, there is a form of location transparency: to write the code for one process or thread, you do not need to know about any other process or thread with which it will communicate. But the most popular implementations of CSP to date do not support communication between different nodes on a network, so they cannot support true location transparency. They also have difficulties with fault tolerance, because failure between two processes or threads cannot be managed easily. Instead, the logic in each process or thread must have the ability to manage any failure that could occur when communicating with the other side. Another potential downside is that nontrivial use of CSP can be difficult to reason about, because every process/thread can potentially interact with every other process/thread at each step.

Here is a simple example of two communicating processes in Go. Interestingly, a Go function can create a channel and put values onto it as well as consume them, in effect stashing values off to the side for use later. In this example, one function produces messages and puts them onto the channel, and a second function consumes them:

Reactive evaluation of CSP

CSP is asynchronous and nonblocking and supports message passing in rendezvous fashion. It scales up to use multiple cores on a machine, but none of the current implementations scale outward across nodes. CSP does not provide any mechanisms for fault tolerance, so it is up to developers to write their own constructs to handle any failure that may occur.

3.4.4. Futures and promises

A Future is a read-only handle to a value or failure that may become available at some point in time; a Promise is the corresponding write-once handle that allows the value to be provided. Note that these definitions are not universally established; the terminology chosen here is that used in C++, Java, and Scala.[14] A function that returns its result asynchronously constructs a Promise, sets the asynchronous processing in motion, installs a completion callback that will eventually fulfill the Promise, and returns the Future associated with the Promise to the caller. The caller can then attach code to the Future, such as callbacks or transformations, to be executed when the Future’s value is provided. Normally, a function that returns a Future does not expose the underlying Promise to its caller.

14

All Future implementations provide a mechanism to turn a code block—for example, a lambda expression—into a Future such that the code is dispatched to run on a different thread, and its return value fulfills the Future’s Promise when it becomes available. Futures therefore provide a simple way to make code asynchronous and implement parallelism. Futures return either the result of their successful evaluation or a representation of whatever error may have occurred during evaluation.

We turn now to an elegant example of Futures in practice: retrieval of data from multiple sources, where you prefer to access sources in parallel (simultaneously) rather than sequentially. Imagine a service that needs to return customer information that may be stored in a database somewhere far away but may also be cached in a store that is closer for performance reasons. To retrieve the data, the program should check the cache first to see whether it has the data needed to avoid an expensive database lookup. If there is a cache miss—if the information is not found—the program must look it up in the database.

In a sequential lookup shown in figure 3.2, the calling thread tries to retrieve the data from the cache first. If the cache lookup fails, it then makes a call to the database and returns its response—at the cost of two lookups that took place one after the other. In a parallel lookup shown in figure 3.3, the calling thread sends requests to the cache and the database simultaneously. If the cache responds with a found customer record first, the response is sent back to the client immediately. When the database responds later with the same record, it is ignored. But if the cache lookup fails, the calling thread doesn’t have to make a subsequent database call, because that has already been done. When the database responds, the response is sent to the client right away, theoretically sooner than if the client had made sequential calls.

Figure 3.2. Sequential lookup: first, check the cache; then, if the data are not found, retrieve them from the database.

Figure 3.3. Parallel lookup: send requests to the cache and the database simultaneously; the result is the first value returned.

The code for parallel lookup may look similar to the following listing, written in Java 8 to exploit its nonblocking CompletableFuture class.

Listing 3.5. Retrieving the result from the faster source
public class ParallelRetrievalExample {
    final CacheRetriever cacheRetriever;
    final DBRetriever dbRetriever;

    ParallelRetrievalExample(CacheRetriever cacheRetriever,
                             DBRetriever dbRetriever) {
        this.cacheRetriever = cacheRetriever;
        this.dbRetriever = dbRetriever;
    }

    public Object retrieveCustomer(final long id) {
        final CompletableFuture<Object> cacheFuture =
           CompletableFuture.supplyAsync(() -> {
                    return cacheRetriever.getCustomer(id);
                });
        final CompletableFuture<Object> dbFuture =
           CompletableFuture.supplyAsync(() -> {
                    return dbRetriever.getCustomer(id);
                });

        return CompletableFuture.anyOf(
            cacheFuture, dbFuture);
    }
}

Performing these two operations sequentially would be expensive, and there is rarely an opportunity to cache data beforehand without knowing what a client will ask for next. Futures provide a handy way to perform both operations in parallel. Using Futures, you can easily create two tasks to search the cache and the database virtually simultaneously, letting whichever task completes first provide the response to the client.

Concurrent lookup thus marshals more resources (footprint) to reduce time from request to response (latency). But concurrent lookups fail when neither the cache lookup nor the database lookup returns soon enough to meet the nonfunctional requirements of the client. So, to be Reactive, any Future implementation must have a timeout mechanism to enable the service to communicate to a client that an operation is taking too long, and that the client will either need to make another attempt to request the data or communicate upstream that there is a failure taking place within the system. Without timeouts, the application cannot be responsive to a user about what is happening and allow the user to decide what to do about it.

Futures are not nonblocking by definition and can vary by implementation. For example, Java prior to version 8 had a Future implementation, but there was no way to get the value out of the Future without blocking in some fashion. You could write a loop that calls the isDone() method on one or more Future instances to see if they were completed, or you could call the get() method, which would block until the Future failed or completed successfully. Check that the Future implementation in the version of the language you are using is nonblocking; if it is not, consider alternatives without this shortcoming.

Similar to event loops, Futures can be implemented with callbacks, allowing you to predefine logic to be applied upon the completion of the Future. But, as with Node.js, callbacks can quickly turn ugly when more than one is applied at a time. Some languages that support functional programming allow you to map over a Future: this defines behavior that occurs only when the Future successfully completes, but not if it fails. By means of higher-order functions[15] such as map, these languages frequently give you the ability to compose behavior that depends on successful completion of many Futures into simple, elegant logic using syntactic sugar such as a for- or list--comprehension.[16] This is particularly useful when staging results from multiple Futures into a single result, as in the next example.

15

16

Listing 3.6. Aggregating a single result from two futures in Scala

Multiple Futures can “race” to fulfill a single Promise, where the first Future to complete supplies the Promise’s value. Because a Promise’s value can be written only once, you can be sure the Future that gets its value from that Promise will not change its value even if other asynchronous tasks, completing later, try to rewrite it. Listing 3.6 demonstrates this technique with the .anyOf method of CompletableFuture: it returns whichever Future finishes first. Notice that this Future’s value is not defined by a code block within retrieveCustomer: a Promise can be fulfilled with a value provided by any event—even synchronously, if it is already available.

Some languages provide higher-level tools built on Futures and Promises, such as first-class continuations and Dataflow. The beauty of these constructs is that they let you write code that appears synchronous but is actually compiled into Futures. This is possible because order can be maintained: each block of code can only be executed after all code that it depends on has been evaluated. Despite the asynchronous nature of Dataflow code, the logic is still deterministic (as long as it is free of side effects, as explained in section 3.2.3). So, each time it is executed, it will behave the same way. If an application were to enter a deadlock state in Dataflow code one time, it would have to do so every time, because the evaluation order is always the same. An example of this is the async-await construct in C# and Scala:

This code snippet shows an alternative way to implement the same functionality as the for-comprehension syntax near the end of listing 3.6.

There is an emerging concept of a safe Future, where methods that can be executed concurrently are annotated or marked in some way, merely giving the runtime the option to optimize them for parallelization where no data is shared. This is a compelling idea, but it is still subject to errors, such as when someone accidentally exposes data to other methods in a method marked as safe. Also, it provides no oversight for failure. Futures in general are a very narrow abstraction: they allow you to define a single operation that will take place off-thread one time and needs to be treated as a single unit of work. They do not handle resilience particularly well: you have to use Future implementations that communicate what went wrong when failure occurs on their execution thread.

Reactive evaluation of Futures and Promises

Futures and Promises are asynchronous and nonblocking, but they do not support message passing. They do scale up to use multiple cores on one machine. Current implementations do not scale outward across nodes in a network. They do provide mechanisms for fault tolerance when a single Future fails, and some implementations aggregate failure across multiple Futures such that if one fails, all fail.

3.4.5. Reactive Extensions

Reactive Extensions (Rx, https://rx.codeplex.com) is a library that originated in the .NET world; it was originally devised and built by Erik Meijer and his team at Microsoft, and it was ported to the JVM in a library called RxJava (http://github.com/ReactiveX/RxJava).[17] Recently this type of API has also seen widespread uptake among Java-Script frameworks like React.js and Cycle.js. It combines two control-flow patterns: Iterable and Observer.[18] Both patterns involve somehow handling a potentially unknown number of items or events. With an Iterable, you write a loop to get each item individually and perform work on it—synchronously and always in control of when the work is to occur. With an Observer, you register a callback to be invoked each time a certain asynchronous event occurs.

17

See also ReactiveX (http://reactivex.io).

18

The combined construct in Rx is called an Observable. With an Observable, you write a looping construct that reacts to events that occur elsewhere. This is similar to streaming semantics, where data are endlessly iterated over as they arrive for processing. The library includes extensions for composing functions using standard operators such as filter and accumulate, and even operators for performing time-sensitive functions based on when events occur. Whereas Futures asynchronously return a single value, Observables are abstractions over streams of data that can be handled in groups. Observables can also tell a consumer when they are finished, much like an Iterator.

The design goal of Rx is not to cover all angles of a Reactive system with one abstraction. It focuses only on passing data in a Reactive fashion through the internal processing steps of a single component in a Reactive system. So, its failure-handling model is restricted to propagating errors downstream (in the same direction as the flow of data) and sending cancellation requests upstream, leaving the treatment of failures to external components. RxJava contains the necessary utilities for back pressure propagation across asynchronous boundaries.[19] This enables it to distribute processing across multiple threads, achieving vertical scalability by utilizing several CPU cores. Failure handling and load management must be delegated to systems like Netflix’s Hystrix.

19

As defined by the Reactive Streams specification: see http://reactive-streams.org.

Observables are defined in relation to a source of some kind: a collection, a network socket, and so on. A subscriber provides the handler functions that tell what to do when a chunk of data is ready to be processed or when an error occurs. An RxJava Observable to handle streams could look like this:

import rx.Observable;
public class RxJavaExample {
    public void observe(String[] strings) {
        Observable.from(strings).subscribe((s) -> {
                System.out.println("Received " + s);

        });
    }
}

A driver that produces the events consumed by that Observable might look like this:

package org.reactivedesignpatterns.chapter3.rxjava;
public class RxJavaExampleDriver {
    final RxJavaExample rxJavaExample = new RxJavaExample();

    public static void main(String[] args) {
        String[] strings = { "a", "b", "c" };
        rxJavaExample.observe(strings);
    }
}
Reactive evaluation of Reactive Extensions

Rx provides facilities to process streams of data in an asynchronous and nonblocking fashion. The current implementations scale up to use multiple cores on a machine but not outward across nodes in a network. Rx does not provide a mechanism for delegating failure handling, but it does include provisions to reliably tear down a failed stream-processing pipeline via dedicated termination signals. RxJava in particular is a useful building block for implementing components of a Reactive system.

3.4.6. The Actor model

The Actor model, first introduced by Carl Hewitt in 1973, is a model of concurrent computation in which all communication occurs between entities called Actors, via message passing on the sending side and mailbox queues on the receiving side.[20] The Erlang programming language, one of the earliest to support Reactive application development, uses Actors as its primary architectural construct. With the success of the Akka toolkit on the JVM, Actors have had a surge in popularity of late.

20

Carl Hewitt, Peter Bishop, and Richard Steiger, “A Universal Modular ACTOR Formalism for Artificial Intelligence,” Proceedings of the 3rd International Joint Conference on Artificial Intelligence (1973).

Inherently asynchronous

The definition of Reactive states that interactions should be message-driven, asynchronous, and nonblocking. Actors meet all three of these criteria. Therefore, you do not have to do anything extra to make program logic asynchronous besides creating multiple Actors and passing messages between them. You need only avoid using blocking primitives for synchronization or communication within Actors, because these would negate the benefits of the Actor model.

Fault tolerance via supervision

Most implementations of Actors support organization into supervisor hierarchies to manage failure at varying levels of importance. When an exception occurs inside an Actor, that Actor instance may be resumed, restarted, or stopped, even though the failure occurred on a different asynchronous thread. Erlang’s Open Telecom Platform (OTP, https://github.com/erlang/otp) defines a pattern for building supervision hierarchies for Actors, allowing a parent Actor to manage failure for all children below it, possibly elevating some failures to an appropriate “grandparent” Actor.

This approach makes failure handling part of the application’s domain, just like classes that represent application-specific data. When designing an Actor application, you should take the time to think of all the ways the application could fail at all levels of the supervisor hierarchy, and what each level should do about each kind of failure. You should also consider how to handle failures that you cannot foresee, and allow the application to respond to those as well. Even though you cannot anticipate the precise cause of every failure, you can always safely assume that the component that failed is now in an invalid state and needs to be discarded. This principle, called let it crash, enables people to design thoughtful responses to failure scenarios beyond those they can think of in advance. Without a supervision hierarchy, this kind of resilience would not be feasible; at best you would have failure-handling code strewn about the logic of the application. All of the aforementioned fault-tolerance patterns are described in detail in chapter 12.

Location transparency

Erlang and Akka provide proxies through which all Actor interactions must take place: a PID in Erlang and an ActorRef in Akka. So, an individual Actor does not need to know the physical location of the Actor to which it is sending a message—a feature called location transparency, treated at length in chapter 5. This makes message-sending code more declarative, because all the physical “how-to” details of how the message is actually sent are dealt with behind the scenes. Location transparency enables you to add even such sophisticated features as starting a new Actor and rerouting all messages to it if a receiving Actor goes down mid-conversation, without anyone needing to alter the message-sending code.

A drawback of the Actor model is that producers and consumers of messages are coupled to one another: the sender of a message must have a reference to the Actor instance that it wants to send to. This reference is just as necessary as a recipient address on a letter, without which the mail service would not know where to deliver it. An Actor reference is much like a postal address in that it only tells where to transport messages, not what the recipient looks like or what state they are in. A benefit of this approach is that each Actor is somewhat decoupled from failures occurring in another, because Actors have no access to each other except through these references. A supervising Actor responsible for handling failures is also protected by this isolating layer of message passing.

No concurrency within one Actor

Because each Actor contains only a single thread of execution within itself, and no thread can call into an Actor directly, there is no concurrency within an Actor instance unless you intentionally introduce it by other means. Therefore, Actors can encapsulate mutable state and not worry about requiring locks to limit simultaneous access to variables.

This greatly simplifies the logic inside Actors, but it does come at some cost. Actors can be implemented in two ways: as heavyweight, thread-based Actors, each with a dedicated thread assigned to it; or as lightweight, event-driven Actors that share a thread pool and therefore consume less memory. Regardless of the implementation, some concept of fairness has to be introduced, where you define how many messages an Actor will handle before yielding the CPU. This must be done to prevent starvation: no one Actor should use a thread and/or CPU core so long that other Actors cannot do their own work. Even if a thread-based Actor is not sharing its thread with other Actors, it is most likely sharing execution cores at the hardware level.

Differences between Erlang and Akka

Given the two prevalent Actor libraries in existence, Erlang and Akka, how do you choose which is more appropriate for a given application? This boils down to the application’s requirements and the platforms on which implementations of Erlang and Akka are available.

In the case of Erlang, the BEAM VM allows each Actor to be implemented as a distinct and isolated process. This is a fundamental reason for the remarkable fault tolerance of Erlang applications.

Erlang Actors use a pattern called Selective Receive, where an Actor receives a message and determines whether it is able to handle that message at that time. If the Actor cannot handle the message right then, it puts the message aside for the time being and proceeds to the next message. This continues until the Actor receives a message that its current receive block can handle, at which time it processes that message and then attempts to retry all messages that were put aside. This is, in effect, a memory leak, because if those messages are never handled, they continue to be set aside and reviewed after every successfully handled message. Fortunately, because the processes in the BEAM VM are isolated, a single Actor’s process can fail for exceeding its available memory without bringing down the entire virtual machine.

On the JVM, there is no such luxury. An exact port of Erlang and OTP on the JVM with Selective Receive would be a memory leak that would eventually, given enough time, take down the entire JVM with an OutOfMemoryError, because all Actors share the same heap. For this reason, Akka Actors have the ability to stash messages on demand, not automatically. They also provide programmatic means to unstash and replay those messages at leisure.

Listing 3.7 shows an example of an Akka Actor application with fault tolerance built in. A parent Actor supervises two child Actors, which send a counter value back and forth to each other, incrementing it each time. When a child receives a value exceeding 1,000, it throws a CounterTooLargeException, causing the supervising parent to restart the children, thus resetting their counters.

Listing 3.7. An Actor example in Akka
package org.reactivedesignpatterns.chapter3.actor

import akka.actor.{Actor, ActorLogging, Props, ActorSystem, OneForOneStrategy}
import akka.actor.SupervisorStrategy.Restart
import akka.event.LoggingReceive

case object Start
case class CounterMessage(counterValue: Int)
case class CounterTooLargeException(
  message: String) extends Exception(message)

class SupervisorActor extends Actor with ActorLogging {
  override val supervisorStrategy = OneForOneStrategy() {
    case _: CounterTooLargeException => Restart
  }

  val actor2 = context.actorOf(
                 Props[SecondActor], "second-actor")
  val actor1 = context.actorOf(
                 Props(new FirstActor(actor2)),
                 "first-actor")

  def receive = {
    case Start => actor1 ! Start
  }
}

class AbstractCounterActor extends Actor with ActorLogging {
  var counterValue = 0

  def receive = {
    case _ =>
  }

  def counterReceive: Receive = LoggingReceive {
    case CounterMessage(i) if i <= 1000 =>
      counterValue = i
      log.info(s"Counter value: $counterValue")
      sender ! CounterMessage(counterValue + 1)
    case CounterMessage(i) =>
      throw new CounterTooLargeException(
        "Exceeded max value of counter!")
  }

  override def postRestart(reason: Throwable) = {
    context.parent ! Start
  }
}

class FirstActor(secondActor: ActorRef) extends
  AbstractCounterActor {
  override def receive = LoggingReceive {
    case Start =>
      context.become(counterReceive)
      log.info("Starting counter passing.")
      secondActor ! CounterMessage(counterValue + 1)
  }
}

class SecondActor() extends AbstractCounterActor {
  override def receive = counterReceive
}

object Example extends App {
  val system = ActorSystem("counter-supervision-example")
  val supervisor = system.actorOf(Props[SupervisorActor])
  supervisor ! Start
}
Combining Actors with other constructs

The location transparency and supervision features of Actors make them a suitable choice to build the distributed foundation of a Reactive application. But within the context of a larger application, local orchestration often does not require the location transparency, supervision, and resiliency of the complete Actor system. In those cases, you will typically combine Actors with more lightweight Reactive constructs such as Futures or Observables, depending on the API offered by libraries or other parts of the system. Those choices sacrifice the full Actor characteristics in exchange for lower memory cost and invocation overhead.

In those cases, is it important to remember the differences. Actors are explicit about their message passing. Spawning and composing Futures from within an Actor may lure you into accessing or modifying the Actor’s state from within a Future’s execution context. That breaks the single-threaded model and leads to concurrency bugs that can be difficult to detect. Even when the state is carefully kept separate, you will need to think about which thread pool executes the Futures’ callbacks or the Observables’ combinators.

Reactive evaluation of Actors

Actors are asynchronous and nonblocking, and support message passing. They scale up to use multiple cores on one machine, and they scale outward across nodes in both the Erlang and Akka implementations. They provide supervision mechanisms, as well, in support of fault tolerance. They meet all the requirements for building Reactive applications. This does not mean Actors should be used for every purpose when building an application, but they can easily be used as a backbone, providing architectural support to services that use other Reactive technologies.

3.4.7. Summary

We have now reviewed the essential concepts and constructs required to build Reactive applications, providing fault tolerance and scalability to help you be responsive to your users. In the following chapters, we will delve into the Reactive philosophy and discuss how these and other concepts relate to it. At this point, you should have a clear understanding of the following:

  • The costs of building an application that is not Reactive
  • What functional programming is and how it relates to Reactive applications
  • The trade-offs involved in choosing between high throughput, low/smooth latency, and small footprint
  • Pros and cons of all application toolkits that support the Reactive model
  • How the Actor model simultaneously addresses both fault tolerance and scalability
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.197.212