I have been fortunate enough to have been using Actors and Scala since 2009, but I have also experienced a great deal of pain from having made many mistakes in that time. Here are some important rules to follow for actor-based development, though many of the rules are applicable to asynchronous coding in general.
One of the hardest things to do in software development is to design a system based on primitives. By that, I mean the lowest level of functional decomposition we can achieve. With actors, we have the ability to define very elemental abstractions of functionality, and we want to take advantage of this. So how do we do it?
Robert Martin, also known as “Uncle Bob,” taught us in Agile Software Development (Prentice Hall) that we should try to isolate responsibilities into unique classes. In doing so, we avoid conflating what an object-oriented class should do and keep the implementation simple. Bob believes, and I agree with him, that each class should be concise and small, giving you more flexibility for its usage. The more you add to it that is outside of its base role, the more you have extra functionality and code for usages that may not require them.
With actors, I believe this to be particularly true. It is very easy to make an actor perform additional tasks—we simply add new messages to its receive block to allow it to perform more and different kinds of work. However, doing so limits your ability to compose systems of actors and define contextual groupings. Keep the actors focused on a single kind of work, and in doing so, allow yourself to use them flexibly.
When we are building supervisor hierarchies, where we organize actors into trees of parents and children, it is easy to spawn children of multiple types (for example, under a CustomerActor
, we could spawn AccountActor
and DeviceActor
instances) that share one supervisorStrategy
. But this can be bad, because you only have one such strategy for each parent. There are some actor developers who feel this can largely be overcome by using very fine-grained exception types to communicate failure of children. However, because Akka only permits one strategy for each supervisor, there is no way to delineate between actors of one grouping for which OneForOne
is the restart strategy you want, and another grouping for which AllForOne
is the preferred restart strategy. Furthermore, because I believe in the Single Responsibility Principle, I want to separate logic unrelated from one another as much as possible. Let’s look at an example.
OneForOne
is a great strategy when the kind of failures that occur are specific to a single actor. This can be exceptions that occur when handling work passed in a message, or a runtime exception focused on a failure that would not affect other actors. AllForOne
is a strategy you may want to employ when the kind of failure affects all of the actors under the supervisor, such as an exception connecting to a database in a child actor that will affect all—in this case, you can use AllForOne
to stop all actors under the supervisor until the connection is known to have been reestablished.
In Figure 3-1, we have an error kernel that supervises two customers. The customer actors themselves supervise accounts and devices, and the accounts supervise the varying types of accounts we can have in our financial institution. Because I have accounts and devices under one supervisor, I have to define a single supervision strategy that is applicable to both. However, while I can design my supervisorStrategy
failure handling to be common to both, I do not have the flexibility to introduce a different restart strategy for each of them. Now my system is more rigid and intractable.
However, I can easily fix this problem by introducing a layer of supervision between the customer and accounts/devices children, as we see in Figure 3-2. In doing so, I can tailor supervision to be specific to failure that can occur with accounts and that which may occur with devices. Note that I don’t have to do this for the varying kinds of accounts below the accounts actor: if I am comfortable that the varying types below a supervisor are sufficiently similar, there is likely no need to add the extra overhead of more supervision layers.
The counter argument might be that you end up having to do more upward escalation of failure messages because of the additional layer of supervision introduced by following this pattern. But since it’s not something I have to do everywhere in my supervisor hierarchies (only where multiple types of child actors are supervised by one actor), I find that to be acceptable. And my system is more clearly defined in elemental, primitive terms.
It is a pattern of actor systems in supervisor hierarchies that the root of the trees compose an error kernel, the part of an application that must not fail under any circumstances. And Akka’s ActorSystem
provides you with a user guardian actor to supervise all actors you as the user create directly under that root of the ActorSystem
. However, you have no programmatic control over the user guardian. It merely specifies that for any java.lang.Exception
that percolates up to it, all actors under it are restarted.
Look at the way our actors are organized in Figure 3-3. The problem with flat hierarchies is that any kind of AllForOne
restarting supervision would result in a lot of actors being restarted, potentially when they were completely unaffected by the error that occurred. It can be tempting to put many different kinds of actors directly under the guardian. Don’t. Instead, build layers of failure handling that isolate failure deep in the tree so that as few actors as possible are affected by something going wrong. Never be afraid to introduce layers of actors if it makes your supervisor hierarchy more clear and explicit, as we see in Figure 3-4.
Creating shallow actor hierarchies with a substantial number of actors should be a “smell test” to you. The deeper your hierarchies, the more you can compose layers to handle failures that can occur. If you have a shallow hierarchy, you are likely not defining the problems that can occur in your problem domain well enough. Think about whether or not failures can be made more granular and how you can layer failure into specific, well-isolated branches within the tree.
When we build trees of actors, it is a common mistake to continually use the default dispatcher provided to you as part of an ActorSystem
. And when you’re just getting started or trying to prototype something, that can be fine. However, when you’re creating a supervisor hierarchy for a production actor-based application, you want to limit the impact of what happens in one grouping of actors with any others. How do we do this? Let’s look at a supervision tree using only the default dispatcher of the ActorSystem
, as shown in Figure 3-5.
In this case, we have our banking customer supervision hierarchy. We have an error kernel managing customers, who in turn are supervising accounts and devices. However, because we haven’t explicitly created any of the actors with any other dispatchers, any kind of expensive computation or blocking that takes place in one part of the hierarchy can lead to actor starvation in another actor that is completely unrelated. That, of course, would be bad.
Instead, we want to build groupings of actors in our supervisor hierarchy that should share resources such as thread pools, as we see in Figure 3-6. In doing so, we allow the failure that can take place in one part of the system to have no bearing on any other unrelated component.
That said, Martin Thompson, and, by extension, Gary Player, developed a measure called Mechanical Sympathy, where you as the developer must understand how hardware works to get the most out of it. The measure must be applied when defining dispatchers. Make sure you think about the following items:
The answers to those questions are highly specific to your application and how it runs. Just keep in mind that it makes no sense to define hundreds of dispatchers with thousands of threads if the tasks are CPU-bound. Try it out on your work machine, and you’ll find that actors are starving for resources not because you didn’t define enough threads for them to use, but because they couldn’t get enough CPU time to take advantage of those resources.
One of the worst offenders when creating high-throughput applications on the JVM is blocking threads. By blocking the thread, you allow the kernel executing the thread to release it from its core, which introduces context switching and the resultant loss of “warmed” caches local to that core. Even if the thread is rescheduled quickly, it is plausible that it will be on another core and data for the thread will have to be reloaded from main memory. This introduces additional latency, as the instructions to be executed have to wait for the data required to be processed. It is difficult to quantify the cost of a context switch, as it varies across hardware and kernel combinations, but it is not unreasonable to assert that a context switch has an overhead of several microseconds.
So how do we avoid blocking? By allowing the logic to be delegated to a future, which can execute in another context—a new thread, returning a new value and allowing us to define behavior that can be executed at that time. Is this as fast as sequential operations? Absolutely not. However it is more scalable. Here’s how.
Actors receive messages into their mailboxes and process them sequentially. There is no question of concurrency in actors, and that is by design. We are using an abstraction to provide us with the ability to write code that is purely declarative. It only describes what we want to do.
But, by definition, that means that the actor can only handle one message at a time and therefore is limited by the amount of work that must be accomplished in handling that message. If we do nothing to change that execution model, the actor could conceivably be overwhelmed by the time it takes to handle messages relative to the number of messages being enqueued.
The answer is to perform the behavior dictated by the received message in futures. In doing so, the actor is able to receive a message, define the work that must be accomplished to successfully handle the request, and then immediately handle the next message in its mailbox and do the same thing. We delegate the work to be performed at some time in the future and define how to handle the success or failure of that task. And this works because the actor is stateless, aside from the DatabaseDriver
in use:
case
object
TransactionSuccess
class
CustomerUpdater
(
dbDriver
:
DatabaseDriver
)
extends
Actor
{
def
receive
=
{
case
UpdateCustomer
(
customer
)
=>
val
originalSender
=
sender
implicit
val
ec
:
ExecutionContext
=
ExecutionContext
.
fromExecutor
(
new
ForkJoinPool
())
val
future
=
Future
{
// define some JDBC action
}
future
.
onCompletion
(
case
Failure
(
x
)
=>
throw
new
CustomerUpdateException
(
x
)
case
Success
=>
originalSender
!
TransactionSuccess
()
)
}
}
In this case, the actor is responsible for performing updates to customer records in a database. It receives messages to update a customer record, passing the new customer information. It then performs the database transaction via the database driver passed to it. In most cases, you would use a pool of such actors, much like a connection pool, so that you don’t have to spin up new instances of database transaction actors every time you want to do work, and no one is overloaded.
The key here is that the actor receives the update request. It uses a future to delegate the work to the transactionActor
, and defines what to do if a failure occurs, such as a SQLException
, by throwing a new, more specific exception back to its supervisor. It wraps the information about the failure and allows the supervisor to determine what happened and what should be done as a result.
But that’s not the important thing to take away from this. What’s important is that we’ve defined all of this work to take place asynchronously, without blocking any threads. The actor defines this behavior and then is free to handle the next message in its mailbox. The latency associated with the database interaction is no longer a factor in the actor’s ability to handle messages.
Think about defining a separate dispatcher for your futures so that you do not impact the message handling by actors. If you use the same dispatcher as the actors, you can have the same starvation issues you were trying to avoid by off-loading the work to another thread to begin with. Give thought to the execution context that should be used by each component in your asynchronous system.
A quick digression before we continue. One thing I am continually amazed about is how few people know or understand the Java Future API. There are only five methods you can call an instance of a java.util.concurrent.Future
class, and only two of those are relevant to getting the expected data out of it.
First, you have get()
. This method blocks the current thread until a future instance completes its work, thus requiring the use of one thread more than the work that must be performed just to manage what happens when it is done. Second, you have isDone()
, which merely allows you to define other work and check to see if the Future is completed. Either way, the calling thread must continue to do work or make itself do a busy spin until both futures complete. This is a horrible waste of resources!
Think about it. If you have two tasks you want to be performed asynchronously, you have to define that work from a calling thread in two separate futures. You then have to make the calling thread block on get()
or perform busy spin in a while loop until isDone()
returns true. It is taking three threads to do the work of two.
However, if you use scala.concurrent.Future
(or the akka.util.Futures
API for Java), you can define work to be performed asynchronously, as well as the behavior to manage what happens when it is done or fails, and then release the calling thread. Threads are valuable resources, as are cores. We should not be wasting them on housekeeping, but the current Java Future API does just that.
It is possible that Doug Lea will work his magic and have a Java implementation of CompletableFuture
ready for Java 8, but it’s impossible to say, at the time I write this, whether that will be ready in time for the code freeze.
The Scala Future API allows us to pre-define work and then stage the way that work is handled via nested Future instances. If you want to create a pair of futures to run in parallel, you do so independently and then use a for
comprehension to handle the results. However, this only works when the results are unrelated and not dependent on one to complete before the next can be handled.
For example, imagine we wanted to display information on a stock trading web page. The customer has selected the security she wishes to purchase. We need to find out the current bid, ask prices, find out the price the security trade for last, and then return that to the view layer to be composed into a response and returned to the customer. We first define the Future instances to get both pieces of information, and then we define the behavior for handling the results:
class
SecurityPricingActor
(
exchange
:
Exchange
,
backOffice
:
BackOffice
)
extends
Actor
{
def
receive
=
{
case
GetPricingInfo
(
security
:
Security
)
=>
val
originalSender
=
sender
val
bidAndAskFuture
=
Future
{
exchange
.
getBidAndAsk
(
security
.
id
)
}
val
lastPriceFuture
=
Future
{
backOffice
.
getLastPrice
(
security
.
id
)
}
val
response
=
for
{
(
bid
,
ask
)
=
bidAndAskFuture
lastPrice
=
lastPriceFuture
}
yield
SecurityPricing
(
bid
,
ask
,
lastPrice
)
response
map
(
originalSender
!
_
)
}
}
It is important to understand the dynamics at play here. We define two futures to get information from two external services. We then define a for
comprehension that will monadically bind the behavior of what to do when both values are returned. However, the for
comprehension is syntactic sugar for the flatMap
and map
methods on the future instance, and just like those methods, intermediate future instances are returned.
So consider the implications of that. The type of response is a Future[SecurityPricing]
. However, the result of the bidAndAskFuture
is a Future[BigDecimal, BigDecimal]
. If and only if that returns successfully do we get a new Future[BigDecimal]
for the lastPrice
. And if and only if that succeeds is the Future[SecurityPricing]
going to be evaluated. But the expensive tasks, which may involve going over some wire to get the information from the exchange or another external service, is not blocking the calling thread, nor is the behavior defined about what to do when they complete.
There is another syntax for performing Future
execution in parallel, and that is the use of the zip()
method in the Future API. This allows you to get around defining the futures outside of the for
comprehension, and some people perfer the syntax. For conciseness, I agree that this is cleaner. However, I find that predefining my futures is more expressive and allows my for
comprehension to be more readable, so I usually eschew this style:
class
SecurityPricingActor
(
exchange
:
Exchange
,
backOffice
:
BackOffice
)
extends
Actor
{
def
receive
=
{
case
GetPricingInfo
(
security
:
Security
)
=>
val
originalSender
=
sender
val
response
=
for
{
((
bid
,
ask
),
lastPrice
)
=
Future
{
exchange
.
getBidAndAsk
(
security
.
id
)
}
zip
Future
{
backOffice
.
getLastPrice
(
security
.
id
)
}
}
yield
SecurityPricing
(
bid
,
ask
,
lastPrice
)
response
map
(
originalSender
!
_
)
}
}
What can be particularly tricky is understanding how to compose the use of futures. Sometimes, work cannot be performed entirely in parallel and must be executed in a particular order, due to a dependency between the values returned from multiple calls.
For example, imagine you used a future to get a set of customers from a database, and then wanted to call another external service to get account information for each customer. How would you predefine that behavior so that you didn’t have to block the calling thread? The answer is to use for
comprehensions and embed the work to be performed in the futures inside of the structure of the for
comprehension so that the second future was only executed if the first one successfully returned the value upon which it depended:
val
accountsForCustomers
=
for
{
customer
<-
Future
{
databaseService
.
getCustomers
}
account
<-
Future
{
accountService
.
getAccounts
(
customer
.
id
)
}
}
yield
(
customer
,
account
)
So far, we have used the callback style (onSuccess
, onFailure
, onComplete
) as well as the monadic style (for
comprehensions) for handling the result of future instances. You may wonder why there are two different syntaxes and when to use each. By convention, the callback style is to be used for side-effecting responses, such as the database access call in the CustomerUpdater
example. The monadic style is for “pure” functional representations, free of side-effecting code. Anyone familiar with functional programming and the desire for purity will likely recognize the pattern.
However, note that the callback style can be unwieldy if we need to nest futures and their result-handling callbacks. We want to be able to compose our logic in a meaningful way, and using for
comprehensions allow us to compose how we handle the results of multiple futures in a clear, succinct way.
While the Future API is very handy for defining work to take place asynchronously, they require an ExecutionContext
in order to perform their tasks. This ExecutionContext
provides a thread pool from which they draw their required resources. Many people start off by using the ActorSystem
default dispatcher like so:
val
system
=
ActorSystem
()
implicit
val
ec
:
ExecutionContext
=
system
.
dispatcher
Future
{
/* work to be performed */
}
However, using the ActorSystem
’s default dispatcher can lead to thread starvation very quickly if it becomes overloaded with potential work. The default configuration of this dispatcher is to be elastically sized from 8 to 64 threads. So just as we saw when discussing failure zones, we should consider it important to isolate execution by context.
You can similarly use an actor’s dispatcher. This gives you more resource granularity but still requires your actor to dedicate a thread to the future every time one is instantiated. This also might not be ideal:
implicit
val
ec
:
ExecutionContext
=
context
.
dispatcher
Future
{
/* work to be performed */
}
You always have the option of creating a new ExecutionContext
from a new thread pool on the fly, which can be done like so:
implicit
val
ec
:
ExecutionContext
=
ExecutionContext
.
fromExecutor
(
new
ForkJoinPool
())
Future
{
/* work to be performed */
}
However, a best practice I recommend is that you consider when you may want to define specific dispatchers inside the configuration of each ActorSystem
in which futures will be used. Then you can dynamically apply the dispatcher for use in your code, like this:
implicit
val
ec
:
ExecutionContext
=
context
.
system
.
dispatchers
.
lookup
(
"foo"
)
Future
{
/* work to be performed */
}
It is common for developers new to Akka, and actors in general, to want to control the flow of actor interactions by “asking” another actor for a response. This may seem fine from the outset, but as you mature as an actor developer, you have to take into consideration the cost of doing such a thing. First of all, the ask pattern in Akka requires a future to send the message in the first place, and queues a callback or monadic handler for the response with the ExecutionContext
. It also has a timeout associated with it.
All of these combine to be quite heavy for a simple actor interaction that could be as simple as sending a message and being prepared to handle the response. Furthermore, you are making assumptions about time in your system, saying that you expect a response within a certain timeframe, and you have to design what to do when that interaction does not take place.
It is better to instead use fire and forget semantics for such interactions. It is lighter weight and makes no assumptions about what must happen at what time. For resilience, your best bet is to actually become a handler for a specific response and then schedule a send starting right at that moment to repeat every so many seconds until a response is received, at which point the send is cancelled:
object
MyActor
{
case
class
DataToHandle
(
bytes
:
Array
[
Byte
])
case
object
GetData
}
class
MyActor
(
otherActor
:
ActorRef
)
extends
Actor
{
import
MyActor._
import
context.dispatcher
var
cancellable
:
Option
[
Cancellable
]
=
None
def
receive
=
{
case
Start
=>
context
.
become
(
dataHandler
)
cancellable
=
Some
(
context
.
system
.
scheduler
.
schedule
(
0
milliseconds
,
500
milliseconds
,
otherActor
,
GetData
))
}
def
dataHandler
:
Receive
=
{
case
DataToHandle
(
data
)
=>
// Do something
cancellable
map
(
_
.
cancel
)
context
.
unbecome
}
}
You might question whether this is a good idea, as the MyActor
instance will now not handle any Start
messages received until a DataToHandle
message is received. In some cases, this may be what you want. However, if you need to handle more Start
messages, you should use the extra or cameo patterns to define a new actor instance to handle what to do when the response is finally received.
Another option is to chain your receive messages, where you allow an actor to register receive blocks and handle the messages received in an ordered fashion because the traits used are linearized (as traits are wont to be). This is accomplished by pre-pending them to the list of receives you manage, like so:[1]
trait
ChainingActor
extends
Actor
{
private
var
chainedReceives
:
List
[
Receive
]
=
List
()
def
registerReceive
(
newReceive
:
Receive
)
{
chainedReceives
=
newReceive
::
chainedReceives
}
def
receive
=
chainedReceives
.
reduce
(
_
orElse
_
)
}
trait
IntActor
extends
ChainingActor
{
registerReceive
{
case
i
:
Int
=>
println
(
"Int!"
)
}
}
trait
StringActor
extends
ChainingActor
{
registerReceive
{
case
s
:
String
=>
println
(
"String!"
)
}
}
Another aspect to consider is that if all of your interactions stem from making calls until an appropriate response is received, it won’t matter that each actor is ignoring the other messages because it will receive each message multiple times. This is critical for building resilient systems! Never expect guarantees of delivery for any message over any medium, regardless of whether or not you know you’re sending messages within a JVM or using Durable Mailboxes or a middleware message broker that supposedly will persist messages. Failures can happen anywhere: in your application, in the hardware, or over the network.
Resilient systems make no expectations about “guarantees” and always keep trying to do what they need to do until it gets done and they’re sure of it.
While it would be ideal to not block threads, the reality is that there will be times when you must perform a blocking operation. A great example is any time an actor is going to perform database work via a legacy database driver. And that’s fine—there is no way for you to communicate with such an external resource and receive a reply without blocking.
The key is to make sure that you limit the effect of the blocking call. To do that, you should put any such actor (or multiple actors in a router) with blocking calls inside of its own dispatcher with resources that aren’t shared by any other actors that perform nonblocking work. By isolating the blocking calls to actors that don’t share resources with others, we can be sure that other actors won’t be starved for a thread within the JVM while the blocking actor does its work:
// Code from an actor that will create the blocking actor
val
customerRetriever
=
context
.
actorOf
(
Props
[
CustomerRetrievalActor
].
withDispatcher
(
"customer-retrieval-dispatcher"
),
"customer-retriever"
)
// Blocking actor pseudocode
class
CustomerRetrievalActor
extends
Actor
{
def
receive
=
{
case
GetCustomer
(
id
)
=>
// Make a database call to look up a customer by the ID sent
}
}
Scala has a very handy feature called Managed Blocking for controlling the amount of blocking code that can exist within an application. When you wrap a block of code with blocking, the thread pool upon which the code will run is notified that the task is blocking or long-running. That thread pool can then elastically spawn and remove additional threads to ensure that no actor or future sharing it is starved for a thread. See ManagedBlocking
in the Scala documentation for more information:
import
scala.concurrent.blocking
blocking
{
// Do some blocking behavior
}
There is a caveat here, of course. Yes, it is great that you can get more threads if blocked threads are consuming your thread pool. However, they are not capped at a specific limit. It is theoretically possible that you can expand the size of your thread pool limitlessly, and that could quickly use up a lot of resources. Keep this in mind when you use managed blocking, and think about how you can use circuit breakers or some other mechanism to provide backpressure when the system is being overwhelmed.
As a side note, I tell people in my “Effective Actors” presentation never to parallelize with routers before you have measured the hot spots in your application. Too often, developers (including myself) make assumptions about where we expect performance to be slow and try to mitigate those problems up front.
Be careful not to create routers before you have proven at runtime under load that you absolutely need to. If you suspect you will need to use a router somewhere, Akka team leader Roland Kuhn recommends that you pass around actor references from supervisors to compute-intensive children so that the context.parent
of their children are free to become a router later on merely by changing the external configuration of the actor.
When designing your Akka application, try to model the interactions in as simple a way as possible. By that, I mean try not to think in terms of asynchronous interactions or how specific algorithms will be designed. Instead, think in terms of the flow of the application, and where possible, try to direct it toward a result.
When I started building actor-based applications, we had teams responsible for building individual subsystems that flowed from one to another. Each subsystem consisted of a RESTful API, a domain, and a real-time component where side effects resulting from the commit of data to the database would be realized. Inside of that real-time realm, we had to model what to do as a result of a new domain object being added, removed, or changed. We did not focus on the details, just what had to be accomplished for each event.
To build an actor-based system from scratch, try to follow this roadmap, as defined by Jonas Bonér.
Try to think in terms of your application being entirely synchronous in nature. This can be tricky, since designing an application to be asynchronous does require that you think about the problem differently than a synchronous application, where you expect a definitive order of execution. But if you can think about your problem as if you have those guarantees, start there. Worry about nondeterministic behavior once you have defined your problem and how the logic would flow if it were synchronous.
When I wrote my first actor-based system, we were attempting to manage the realization of domain types to multiple servers in a cluster: if someone created a customer, we wanted to create a Customer
actor whose sole responsibility was to make sure all servers for whom that customer should exist were able to update themselves with the customer’s information. In this case, we could reason about what would happen in the asynchronous part of our application pretty easily: if you added a domain object, it would result in that domain object being “realized” to a server in our cluster. Assuming no failure and not taking into account timing issues, the add succeeded, and our system was in a nominal state. If failure or timing issues (such as the customer being added before type on which it depended) occurred, the actor continued attempting to realize that domain object until it succeeded.
This applies to nonactor applications as well. When writing Scala, we want to focus on being as declarative as possible—focus on what you want to do, not how it will be done. Imperative programming in Java is nondeclarative. When we want to iterate over a collection and get out only those values that meet some criteria and put the values into a new collection, we have to write the code like this (assuming we’re not using a non-standard language library):
List
shortWords
=
new
ArrayList
();
for
(
String
word
:
wordList
)
{
if
(
word
.
length
<
5
)
{
shortWords
.
add
(
word
);
}
}
In Scala, we instead focus on what we want to do so that the details of how that work is done don’t clutter the logic of our application:
val
shortWords
=
wordList
.
filter
(
_
.
length
<
5
)
This makes our code considerably more concise, which means that it is cognitively easier for someone else maintaining our code to understand what is going on and make changes to it with confidence. We want developers to be able to understand semantically what is happening in our application as quickly as possible and not be bogged down in details of how things are being accomplished.
Like declarative programming, we want to focus on immutability regardless of whether or not we’re writing Akka applications. But the reason it is so important with actors and futures is that your application will be multithreaded by nature. By focusing on immutability, you are ensuring correctness of data shared across threads and reducing the possibility of dead locks or live locks that can occur when using locks to manage shared, mutable state.
Again, this is a rule for non-Akka application development as well, and it’s somewhat redundant to the immutability just mentioned. By functional, I mean your code should be immutable, referentially transparent, and have first-class functions. Actors themselves are not functional programming constructs, but we do want to adhere to these principles within the actors as much as possible. Referential transparency is when you can substitute a value for a block of code and still have the exact same result: no side effects have taken place, and no unintended consequences of the evaluation of those expressions exist. First-class functions mean they can be passed as arguments to methods, just like a string or a domain object, or they could be composed functionally (such as through for
comprehensions).
Keep mutability and side effects within the actor, and favor vars of immutable data. For example, because actors prevent concurrent access to internal mutable state, it is easy to use mutable data structures. However, if you decide to pass that data elsewhere, you’ve created a concurrency issue. It is better to use vars of immutable data structures inside of actors so you do not have to worry about that.
Once you have built your application following the the functional paradigm, you want to begin to attack the problem in targeted locations where you’ve measured that you need to increase performance. How do we do that? Identify the parts of your system where additional asynchrony will have the most benefit: for example, blocking I/O or anywhere you’ve actually measured a critical section that can’t handle the load. From there, layer in additional asynchrony via anonymous actors and futures only in those targeted spots.
Now you have an application that is functional and asynchronous. This is very desirable, but the nature of functional programming is that it results in the creation of lots of (hopefully) short-lived objects on the JVM. That is okay: if the lifespan of such objects is limited to Eden in the JVM’s garbage collection regions. However, if they begin to leak into OldGen, they will be costly to clean up and could result in lots of compaction latency. So, in order to optimize our application, we may begin to look at mutable state in very targeted locations, where we have proven with measurements obtained from profiling our application at runtime, to make our code run more efficiently. How can we do that?
This is where things start to get tricky. You’ve built your asynchronous system, and it meets your requirements for what it is supposed to do. However, you’re finding that it isn’t able to perform the work as quickly as your nonfunctional requirements dictate, and parallelism isn’t an option because you don’t have a task that can be performed independently and concurrently on different JVMs. You need to optimize your code. One way to do so is to use mutable state.
However, using mutable state can be very tricky. You want to avoid locking as much as you possibly can, and not just because of the difficulty of managing locks programmatically. Locks, even on an abstracted platform such as the JVM, are arbitrated by the kernel of the OS on which the JVM is running. This means that lock access happens on a core executing the kernel, and your thread is paused to arbitrate lock access. When this happens, the core can be assigned a new thread, and the data specific to your thread in the warmed caches local to that core is now evicted for the data required by the new thread. This can have a measurable affect on performance, because even if your thread was scheduled back to that same core, it is extremely likely that you will encounter cache misses, and the data your thread needs will have to be reloaded from main memory.
In order to avoid this, we can use Java’s excellent Atomic References. These use compare and swap (CAS) semantics to discern if a change can be made to the data it protects by comparing the data to be changed with what it expects the data should still be, assuming no other changes have taken place, at the time the change will occur. If I use an AtomicInteger
to wrap a counter, and I want to increment the value from 5 to 6, the change will fail if the counter is not a value of 5 at the time I’m effecting the change. Java 8 is adding new Adder
and Accumulator
types that will greatly facilitate this, if you have the ability to run with that platform.
This will only get you so far, however, as Java’s Atomic References do not compose. By that, I mean you cannot combine two AtomicInteger
instances to have one atomic operation. To do this, you can use Software Transactional Memory (STM), where you make changes to the two values within the context of a single transaction, and the commit only occurs if both values are what was expected at the time the commit takes place.
However, STM is not some magical awesome-sauce that you spread around your application that makes things just magically start working. If the data upon which you apply STM is changing rapidly via other threads, it is entirely possible that the memory cannot be committed as collisions in the multifield CAS operations aren’t occurring fast enough to compensate for the changes occurring. It is a tool that is available to you but has its limitations.
As a side note, one very nice feature of Scala-STM is the TMap collection, which can provide you snapshots of the collection’s data in constant time. That can be a handy feature on some situations. For more information, see the documentation.
As an absolute last resort, such as when CAS-semantics do not work for your application because updates to the data are happening too quickly, resulting in too many failures to effect changes, you have to use locks. But only do so as an absolute last resort. This is why you were most likely looking at building an actor-based system to begin with.
Locks do not compose, so trying to interleave them can easily lead to deadlock and live lock situations. Deadlocks are at least relatively easy to find, identified via jstack’s listing of threads in the JVM or JConsole/VisualVM. Live locks are much trickier because both threads involved are actively doing work but unable to get out of each others’ way. Think of it as being analogous to walking down a street where someone is coming in the opposite direction, and both of you keep stepping in the same direction and can’t get out of the way of one another.
Furthermore, mutually exclusive locks (MUTEXs) are very expensive when they are contended. Blocked threads are removed from the core one which they are executing by the operating system kernel so that other threads have a chance to run. Even if the contended lock gets access quickly, it has to be rescheduled by the kernel and may not be placed back on the same core. All of the warmed hardware caches local to that core are now lost and must be rebuilt where the thread continues its execution.
With asynchronous code (such as Akka actors), we gain tremendous benefits from being able to leverage threads and maximize the usage of physical hardware resources such as cores and sockets. However, this comes at the expense of determinism, the guarantee that providing the same inputs will always result in the same logic path being executed and the same result. This means that it is entirely plausible, or even likely, that we will experience race conditions, where we expect something to have a specific value at a particular time, but it does not.
So how do we avoid writing code that is dependent on time? This goes back to my core belief that resilient systems never try to do things only once: they continue to do things until the world around them changes to the way they expect it to be, or they escalate failure so that higher-level supervisors can recognize that the world needs to be pushed in a particular direction. If you write asynchronous code, such as Akka actors, with the expectation that everything will always happen at just the right time, you are going to fail pretty quickly.
Here is an example. While working at a large cable company, we were building a repository of what channels every customer was allowed to watch, based on what they were paying for. A customer could call in at any time and change those permissions, and those changes had to be reflected in the data set within 10 minutes. The “source of truth” database could provide the information, but the relational queries involved too many joins to be performant enough to meet our nonfunctional requirements for servicing requests. We came up with two possible solutions:
Try to think of how you can approach your problem similarly. It may be easier to first think about the problem in terms of your blocking, time-based approach. But once you’ve properly identified what you have to do, you can think about the same problem in terms of how to implement it such that time doesn’t matter and the system can heal itself.
A best practice for actor development is to avoid ask: use fire and forget messages, and be prepared to handle responses. This is simpler and more expressive than using futures and having to map over composed responses, but I’m also only using one asynchronous threaded resource (the anonymous actor I created) as opposed to multiple futures for the sends and the myriad futures returned within a composed for
comprehension to handle the results. Always try to focus on tell over ask.
Truly fault-resilient systems send messages in fire and forget fashion and prepare to receive the expected response. They do not make assumptions that sending once means that the message was definitely received and is being handled, since anything can happen in between. As such, we should schedule a task to continually resend that message a pre-defined number of times in some acceptable duration. If no expected response is received within that timeframe, we should be able to handle or escalate the failure.
One of the surest ways to get yourself into trouble when using actors is by being vague or general in your interactions. Instead, you want to make sure that you have well-defined interactions and know exactly what failures can occur. This way, you can minimize the impact on other actors at runtime and keep “event storms” from happening, which can be very difficult to reason about.
First of all, give unique names to all of your actors and ActorSystem
instances. Not only will this allow you to create and update external configuration for those objects on the fly after the fact, but you will be able to write code that references them by name via actor lookup. If you don’t name them, you will have to add names later on when you decide you do want to do these things. Furthermore, you’ll find more information in the Akka trace logs and logging when you name them:
// Create with name
val
myActor
=
context
.
actorOf
(
Props
[
MyActor
],
"my-actor"
)
// Lookup
val
myActor
=
context
.
system
.
actorFor
(
"my-actor"
)
Note that, as of Akka 2.2, the old actorFor
method has been deprecated due to differences in behavior between actors found locally versus those that are remote. Instead, use actorSelection
, which looks up actors via absolute or relative paths. See the Akka documentation for more details.
Make a point to avoid passing messages that are general and do not get targeted toward a specific actor instance. You want your messages to flow through your supervisor hierarchy in a direct fashion, routing to the actor instance that is best equipped to handle the message without being sent down to others outside of its path. Here is an example of a generic message that you do not want to send through your supervisor hierarchy:
case
object
AccountsUpdated
This is a general message that will have to go to every CustomerActor
in your system, and all of them will have to determine if that message is relevant to them. This is how event storms can occur, as all of your customers need to figure out from some service whether or not they care about this event.
Instead, we want to send messages that are very explicit so that only the CustomerActor
for which the account changed is notified and performs work based upon the message. Here is an example of an explicit message:
case
class
AccountRemoved
(
customerId
:
Long
,
accountId
:
Long
)
When creating supervisor hierarchies, it is tempting to try to generalize exception handling so that you can catch virtually every possible failure at the lowest possible level. However, this defeats the purpose of escalating failure to the appropriate level. What do I mean be generalizing exceptions?
class
MySupervisor
extends
Actor
{
override
val
supervisorStrategy
=
OneForOneStrategy
()
{
case
_:
Exception
=>
Restart
}
...
}
In this case, our supervisor is going to restart every individual actor it is supervising when any exception is thrown from them. But this is the default supervision behavior provided by Akka if you do not override it, so it’s actually redundant! But that does not mean we should ever omit it.
If you do not override the default supervisorStrategy
in your supervisor actor, the default behavior will be to escalate the failure upwards. If there is no handler all the way to the root of your supervision hierarchy, the “root guardian” actor in Akka, which is supervising your hierarchy within its ActorSystem
context, will catch it and automatically restart everything below! While this can be useful for restarting a system that has suffered a catastrophic failure, it is hardly the default behavior you likely want for your application. Akka will handle a few exceptions it knows about explicitly, but everything else will just be result in the restarting of all of your actors.
By default, Akka will stop a child actor on ActorInitializationException
and ActorKilledException
. Akka will restart a child actor on Exception
, and all other types will be escalated to the parent actor.
Think about what that would mean. You have a reasonably benign failure somewhere deep in your supervision hierarchy. That failure bubbles up to the ActorSystem
, resulting in all of the actors you created in that ActorSystem
being restarted when they finish handling their current messages. Whatever transient state they may have had would be lost, and all because of something that could have been handled more locally.
The better route is to create very specific exception types at the leaves of your supervision tree. As you flow upward, the exception types defined can be more general, and escalation can be used to make sure the appropriate supervisor ends up handling the message:
class
MySupervisor
extends
Actor
{
override
val
supervisorStrategy
=
OneForOneStrategy
()
{
case
_:
SQLException
=>
Resume
case
_:
MyDbConnectionException
=>
Escalate
}
...
}
In this case, the SQLException
was probably related to bad data in the message being processed to update the database. How you handle such problems is domain-specific to your application: maybe you need to tell the user, maybe retry the message that was handled, etc. But a connection problem may be indicative of something that may be an issue for all actors trying to access that data store, such as a network partition. Then again, it might not be, if it is related to an authentication issue. In this case, you want to escalate the failure to the parent actor responsible for managing all actors with such database connections, which can decide whether this was an isolated failure or one of many occurring simultaneously. If the latter, it may decide to stop all actors relying on those connections until a new connection can be established.
One of the drawbacks of using general messages and exceptions is that they can lead to the unintended consequence of too much activity taking place in your actor application, too many messages being sent around as a result, or too many actors being affected by something that could have been handled locally. When this happens, you see “event storms” that can be difficult to diagnose. Tons of log output to pore over, lots of messages in the event trace logs of Akka, etc. It is entirely plausible that, despite your best intentions, such storms could happen anyway, as it is highly unlikely you’ll think of every possible event that could lead to such a happening from the outset. But there are a couple of ways to handle them.
If you know that your system is capable of sending messages in a repeated fashion over and over again in an effort to provide resiliency, you have to be able to ignore the same message that may arrive again after you’ve already handled it. One such pattern I’ve seen, based on basic control theory, is to dampen your messages by a unique identifier. If you have received a message with the same such ID within the past x number of milliseconds, simply ignore it.
This is a feature for handling cascading failures in remote endpoints of your distributed application. You merely define that you want to implement this behavior, and provide inputs for how many failures can occur in how much time and how long to wait before reopening the circuit breaker:
class
CircuitBreakingActor
extends
Actor
{
import
context.dispatcher
val
circuitBreaker
=
new
CircuitBreaker
(
context
.
system
.
scheduler
,
maxFailures
=
10
,
callTimeout
=
100.
milliseconds
,
resetTimeout
=
1.
seconds
).
onOpen
(
logCircuitBreakerOpen
())
def
logCircuitBreakerOpen
()
=
log
.
info
(
"CircuitBreaker is open"
)
Using circuit breakers allows you to provide fast failure semantics to services and clients. For example, if they were sending a RESTful request to your system, they wouldn’t have to wait for their request to time out to know that they’ve failed, since the circuit breaker will report failure immediately.
Actors are intended to be self-contained components that do not have any interactions with the outside world except via their mailboxes. As such, they should never, ever be treated in the same fashion that you would an ordinary class.
Prior to the creation of the ActorRef
proxy for Akka actors, it was entirely plausible to be able to create an actor and send it a message but also directly call a method on it as well. This had terrible consequences, in that the actor would have one thread performing behavior based on handling messages from its mailbox, and another thread could call into methods on it introducing the exact concurrency issues that we were trying to avoid in the first place by using actors! Those weren’t happy times for me.
If there is one thing you should take away from this book, this is it. Never refer to any actor class using the open recursive this
that is so prevalent in object-oriented programming with Java, Scala, and C++. Nothing good can ever come from it.
Imagine you wanted to register an actor via JMX so that you could keep an eye on its internal state in production. It sounds like a great idea because no tool is going to be able to tell you that information unless you expose it yourself. However, the API for registering an MBean in the JDK involves passing an ObjectName
to uniquely identify the instance and the reference to the MBean that you wanted to get data from:
val
mbeanServer
=
ManagementFactory
.
getPlatformMBeanServer
def
register
(
actor
:
InstrumentedActor
)
:
Unit
=
{
Try
{
mbeanServer
.
registerMBean
(
actor
,
actor
.
objectName
)
}
recover
{
case
iaee
:
InstanceAlreadyExistsException
=>
???
case
mbre
:
MBeanRegistrationException
=>
???
case
ncme
:
NotCompliantMBeanException
=>
???
case
roe
:
RuntimeOperationsException
=>
???
}
}
See how we need to register the “actor” parameter to the mbeanServer
? Now, when you try to merely view the internal data attributes, that’s not that big of a deal because it’s a read-only operation coming from the mbeanServer
’s thread. In fairness, that means you aren’t concerned with absolute consistency or the possibility of seeing partially-constructed objects in JConsole or whatever other mechanism you’re using to consume the JMX MBean. But if you define any operations in your MBean, you could very easily introduce concurrency issues and you’re toast.
JMX is a simple example, but it’s representative of the whole gamut of Observer pattern implementations you might try to use, especially when interacting with legacy Java code and libraries. They’ll want you to register the instance of the class to notify when an event occurs and the method to call inside of them, when you only ever want actor messages to be sent.
Instead, if you catch yourself using this
in an actor, always change it to self
. Self is a value inside of every Akka actor that is an ActorRef
to itself. If you want to perform looping within an actor, send a message to that self ActorRef
. This has the additional benefit of allowing your system to inject other messages into the looping so they aren’t starved for attention while the actor does its work.
In The Cameo Pattern, I switched how I created the Props
for my AccountBalanceResponseHandler
actor instance to a factory props()
method in its companion object. This may seem like an unnecessary implementation detail or a matter of preference, but it is actually a very big deal. When you create a new Akka actor within the body of another Akka actor (as of Akka 2.2 and Scala 2.10.x), a reference to this is captured from the actor in which we created the actor. In the Cameo Pattern example, the AccountBalanceResponseHandler
would have a direct reference to the AccountBalanceRetriever
actor. This isn’t something you will typically notice, but it is something you would never actually want to have happen because you never want to expose a this reference to another actor: it opens the door to having multiple threads running inside of the actor, which is something you should never allow to happen.
There is a proposal to make the Props API based upon the concept of Spores, which are part of SIP-21 and may be included in an upcoming version of the Scala language. By forcing users of a library to pass information in a Spore, they would have to explicitly capture the state they want to pass to the new Akka actor reference, and a this
reference to the one who created it could not leak over.
Roland Kuhn, currently the head of the Akka team, is the person who defined this best practice with some help from Heiko Seeberger, Typesafe’s Director of Education. But I also see another benefit to this approach. You are putting the information about how to create the Props reference in one place—otherwise, those details are spread around to every place in your code that is creating instances of this actor.
Note that you could use an apply()
method instead of a method named props()
. I’m not terribly keen on that idea, however—apply()
is a method that should return an instance of the type for which the companion object was defined. In this case, the actual return type is an instance of Props
. As a result, I don’t think it meets the basic contract of what an apply()
should do, and I think a method name that describes what you’re actually creating is more appropriate.
Here is an example. Historically, we have grown very comfortable with creating actors like this:
case
object
IncrementCount
class
CounterActor
(
initialCounterValue
:
Int
)
extends
Actor
{
var
counter
=
initialCounterValue
def
receive
=
{
case
IncrementCount
=>
counter
+=
1
}
}
class
ParentOfCounterActor
extends
Actor
{
val
counter
=
context
.
actorOf
(
Props
(
new
CounterActor
(
0
)),
"counter-actor"
)
def
receive
=
Actor
.
emptyBehavior
}
To avoid this potential issue of closing over this
, I recommend you instead instantiate the Props
for your new actor like this:
object
CounterActor
{
case
object
IncrementCount
def
props
(
counter
:
Int
)
:
Props
=
Props
(
new
CounterActor
(
counter
))
}
class
CounterActor
(
initialCounterValue
:
Int
)
extends
Actor
{
var
counter
=
initialCounterValue
def
receive
=
{
case
CounterActor
.
IncrementCount
=>
counter
+=
1
}
}
class
ParentOfCounterActor
extends
Actor
{
val
counter
=
context
.
actorOf
(
CounterActor
.
props
(
0
),
"counter-actor"
)
def
receive
=
Actor
.
emptyBehavior
}
Use a companion object props()
factory method to create the instance of Props
for an Akka actor so you don’t have to worry about closing over a reference to this
from the actor that is creating the new actor instance.
With the exception of using TestActorRef
(in Akka’s TestKit implementation) and getting the underlyingActor
for unit testing purposes, you should never know the type of an actor. You should only ever refer to it as an ActorRef
, which will only expose an API of message sending. If you find code that has a reference to an actor by its actual type, you’re making it very easy for someone to introduce the exact concurrency issues we talked about in the previous section.
This is really a good rule for lambdas in general. Any time you have a lambda, it becomes a closure when you reference state external to that lambda’s own scope. And that is okay, so long as the external state you are referencing is immutable. In Java8’s upcoming lambdas, they were smart enough to enforce that all closed-over external state must be final, much like it had to be when creating a nested inner class, or anonymous inner class implementation. However, with Java, merely making a field final doesn’t mean it is immutable, as we’ve all come to know (and much to our chagrin).
However, we often take it for granted in actor development that we can use mutable state within an actor without worrying about concurrency because we will only ever have one thread operating inside of it. However, if you close over that mutable state, especially in a deferred operation like a future, you have no idea what the value of that mutable state will be when that deferred operation is actually executed. This was painfully apparent in the sender issue displayed in The Extra Pattern:
// BAD!
class
MyActor
extends
Actor
{
var
counter
=
0
;
def
receive
=
{
case
DoSomethingAsynchronous
=>
counter
+=
1
import
context.dispatcher
Future
{
if
(
counter
<
10
)
println
(
"Single digits!"
)
else
println
(
"Larger than single digits!"
)
}
}
}
This is scary code. We have a counter value that can change with every message that is received. We then defer printing out some information based on the value of that counter. We can’t say with any degree of certainty when that future will be executed, as it’s dependent not only on whether there are threads available in the Actor’s own dispatcher, but also on when the kernel will schedule the thread to a physical core! You will get very indeterministic results if you execute code like this.
If, for whatever reason, you must close over mutable state in a future lambda, immediately copy it into an immutable local field. This will give you assurance that you’ve stabilized the value locally within the lambdas context to ensure that nothing unexpected can happen to you:
// GOOD!
class
MyActor
extends
Actor
{
var
counter
=
0
;
def
receive
=
{
case
DoSomethingAsynchronous
=>
counter
+=
1
import
context.dispatcher
val
localCounter
=
counter
Future
{
if
(
localCounter
<
10
)
println
(
"Single digits!"
)
else
println
(
"Larger than single digits!"
)
}
}
}
In this second example, we capture the counter value at the time the message was handled and can be assured that we will have that exact value when the future’s deferred operation is executed.
Another possible issue is sending data between actors. We all know that while you may define an immutable value, that does not mean that the attributes of that object are immutable as well. Java collections have long been the prime example of this: merely defining their variables as final means that you can’t perform reassignment of that variable name to a new instance of the collection, not that the contents of that collection itself can’t be changed.
If you find yourself in a position where you need to send a mutable variable in a message to another actor, first copy it into an immutable field. The reasons for this are the same as stabilizing a variable before using it in a future—you don’t know when the other actor will actually handle the message or what the value of the variable will be at that time. The same goes for when you want to send an immutable value that contains mutable attributes.
Erlang does this for you with copy on write (COW) semantics, but we don’t get that for free in the JVM. Assuming your application has the heap space, take advantage of it. Hopefully, these copies will be short-lived allocations that never leave the Eden space of your garbage collection generations, and the penalty for having duplicated the value will be minimal.
Asynchronous programming, regardless of the paradigm, is very difficult to debug. Anyone who tells you otherwise is pulling your leg, being sarcastic, or trying to sell you something. If you’ve merely used a ThreadPoolExecutor
in Java and spawned runnables, or more recently, tried the ForkJoinPool
, what happens if something goes awry on the thread that was spawned? Was the thread that sent that other thread off and running notified? No. Unless you made some effort to at least log when errors occur in that other thread, you may never know that the failure even occurred, if you didn’t create the ForkJoinPool
with the constructor that allowed you to register a generic Thread.UncaughtExceptionHandler
callback.
This is one of the primary reasons that supervisor hierarchies and actors are such powerful tools. Now, we can not only know when failure happens in asynchronous tasks, but we can also define behaviors appropriate to those failures. The same goes for Scala’s future implementation, where you can define behavior that is executed asynchronously depending on whether that deferred operation succeeded or failed.
That said, it is up to you, the developer, to come up with ways to give yourself clues about what went wrong in production. We can’t merely attach a remote debugging session to the JVM running the bad code because we would have had to start the JVM with the -Xdebug flag set, which would have prevented a lot of very important runtime optimizations in the Just In Time (JIT) compiler from being performed. That would be terrible for our application’s performance. So what can we do? Monitor everything!
First of all, you need to give yourself as much visibility as possible. You want to be able to use tools that will show you what is happening live in production at any time. That means you need to instrument your application with JMX or do something so that you can see the state inside of your actors at runtime. The Typesafe Console is a wonderful tool that will show you all kinds of information based on nonfunctional requirements about your application, but it will not show you internal state. And no tool that I know of will. Whatever you must do, you must make state accessible in production.
One of the best things you can do is use metrics inside of your application to provide insight as to how well it is performing. I highly recommend you consider using Coda Hale’s Metrics library. However, you have to think about what you want to capture before you can add them, such as possibly writing your own Akka actor mailbox to capture information about how quickly messages are handled. Nonetheless, using tools like metrics in your application is extremely helpful, especially when you want to make internal state externally visible, which cannot be provided by profiling tools such as the Typesafe Console.
One thing that we’ve learned in object-oriented programming is that encapsulation is key. And generally speaking, I agree. However, before Akka’s TestKit came along, the only way to write functional logic that could be tested in isolation (without firing up actors and sending messages that could result in side effects) was to write all business logic in external function libraries.
This has a few added benefits. First of all, not only can we write useful unit tests, but we can also get meaningful stack traces that say the name of where the failure occurred in the function (but only if we define them as def
methods, not as val
functions, due to the way Scala’s Scope works). It also prevents us from closing over external state, since everything must be passed as an operand to it. Plus, we can build libraries of reusable functions that reduce code duplication.
Since the introduction of Akka TestKit, this is no longer a rule to me. However, I still find it prudent and useful to follow this best practice.
Let’s be honest, it’s a pain to write log output. Sometimes we aren’t consistent with the logging levels across all developers on a team. And each call to the logger, regardless of whether or not the logging actually occurs, can hurt performance in the aggregate. However, with asynchronous execution of your logic, it is your best tool to figuring out what is happening inside of your application. I’ve yet to see a tool that replaces it for me.
That said, merely logging isn’t enough. We take it for granted that Scala’s case classes will provide us with a useful toString
method, and it certainly beats having to write our own. However, how many times have you looked through such output, with your head going side to side like you’re watching a tennis match, looking for just one value inside of some long output string, like that of a collection?
Pretty printing will help you immensely. Be profligate in your logging, but note that Akka’s own logging does not have a trace level. At the debug level, include output that will print out in a useful way so that you can quickly look at the output and discern the important field and value, using tabs and carriage returns. For example:
// Using the new Scala 2.10 String Interpolation feature here
if
(
log
.
isDebugEnabled
)
log
.
debug
(
s
"Account values: "
+
s
"Checking: $checkingBalances "
+
s
"Savings: $savingsBalances MoneyMarket: $mmBalances"
)
The reason we check to see if the debug log level is enabled first is so that we don’t go through the expense of assembling the output string if we’re not actually going to write the statement.
So how do you enable logging in Akka? First of all, set up your configuration file (application.conf if this is for your application; library.conf if this is for a library JAR) with the following:
# I'm using flat config for space considerations, but anyone familiar
# with the Typesafe Config library should understand what I'm doing here
akka.loglevel
=
"DEBUG"
akka.event-handlers
=
["akka.event.slf4j.Slf4jEventHandler"]
akka.actor.debug.autoreceive
=
on
akka.actor.debug.lifecycle
=
on
akka.actor.debug.receive
=
on
akka.actor.debug.event-stream
=
on
An example of the logback.xml file could be like so:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration
scan=
"true"
scanPeriod=
"5 seconds"
>
<appender
name=
"STDOUT"
class=
"ch.qos.logback.core.ConsoleAppender"
>
<encoder>
<pattern>
%date{ISO8601} %-5level %logger{36} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<appender
name=
"FILE"
class=
"ch.qos.logback.core.rolling.RollingFileAppender"
>
<file>
effective_akka.log</file>
<rollingPolicy
class=
"ch.qos.logback.core.rolling.FixedWindowRollingPolicy"
>
<fileNamePattern>
/tmp/tests.%i.log</fileNamePattern>
<minIndex>
1</minIndex>
<maxIndex>
10</maxIndex>
</rollingPolicy>
<triggeringPolicy
class=
"ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"
>
<maxFileSize>
500MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>
%date{ISO8601} %-5level %logger{36} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<logger
name =
"akka.actor"
level =
"DEBUG"
>
<appender-ref
ref =
"FILE"
/>
<appender-ref
ref =
"STDOUT"
/>
</logger>
<root>
<level
value=
"INFO"
/>
<appender-ref
ref=
"STDOUT"
/>
</root>
</configuration>
Just put these two files in your classpath (i.e., src/main/resources folder). Now you have access to all of the output possible from Akka itself, without having to result to logging each message receive and actor lifecycle event yourself.
If you have a distributed actor application across multiple nodes, you want to use a tool like Flume to aggregate all of your actor logs together. Akka logging is asynchronous and therefore nondeterministic: it’s entirely possible that the ordering of the aggregated log output will not be exactly right, but that’s okay. Having one rolling log file as opposed to having to look at them across multiple machines is a much simpler task. Just imagine the timestamp variance possibilities if you don’t aggregate.
This is a critical tool for debugging. Every one of your messages should be a case class instance with an ID associated with it. As a general rule, do not pass literals and do not pass objects (though I’ve always felt it’s okay when you pass a case object Start
).
Why do I want to do this? Because it makes debugging via log files that much easier. Now, if I know the specific message ID that led to a problem, I can grep/ack/ag
(ag
is the command of The Silver Searcher) the output logs for all messages containing that ID and view the flow of that message through my system. That is, assuming I logged the output when the message was received and handled.
I’ve seen implementations where every actor message was passed with a UUID to uniquely identify it. That is great, since the odds of two UUIDs ever being exactly the same is infinitesimal. However, java.util.UUID
instances are expensive to create, so unless you’re generating billions of messages daily, this may be more unique than you actually need.
For example, would it suffice to use some value where the likelihood of a collision over a day or a few hours was low? We generally know when an error occurs, as far as the timestamp we should expect to see associated with that ID. If we grep the logs, and it returns a bunch of output for when the error occurred, as well as some from the day before or several hours after, we’ve at least whittled down the output to something manageable, and the ID has been useful. And hopefully cheaper to create. There are GUID generation libraries available with a simple search of the Internet, if you are so inclined.
One of the biggest questions I encounter among users of Akka is how to use dispatchers to create failure zones and prevent failure in one part of the application from affecting another. This is sometimes called the Bulkhead Pattern. And once I create the failure zones, how do I size the thread pools so that we get the best performance from the least amount of system resources used? The truth is, every application is different, and you must build your system and measure its performance under load to tune it effectively. But here are some tips and tricks.
Note that the Typesafe Console is available for free to all developers and will be integrated into the Typesafe Activator to make it easier to set up and use quickly.
Most people building an Akka application start out with a single ActorSystem
, using the default dispatcher settings with a minumum number of threads of 8 and a maximum number of threads of 64. As the application grows, they notice that futures time out more frequently, since futures in Akka actors often use the actor’s dispatcher as their ExecutionContext
implicitly. Eventually, as more functionality is assigned to be run on threads, the default dispatcher begins to become overloaded, trying to service too many simultaneous tasks.
Because of the limited resources of one thread pool for all actors and futures in their application, resource starvation is occurring. When that happens, I recommend that you identify actors using futures and consider where you can use a separate dispatcher or ExecutionContext
for those futures so that they do not impact actors with their thread usage. We want to limit the impact of the work of those futures on the actors handling messages in their mailbox. If you have the Typesafe Console, you can see the starvation occuring as the maximum latency in handling messages at the dispatcher level increases.
As a temporary workaround, I have noticed some people try to use a PinnedDispatcher
for each actor so that starvation is less likely. Actors created with PinnedDispatcher
will receive their own dedicated thread that lives up until the keep-alive-time configuration parameter of the ThreadPoolExecutor
(default of 60 seconds) is not exceeded. However, this is really not a viable solution for production except for very specific use cases, such as service-oriented actors handling a lot of load. For most other tasks, you want to share resources among actors with similar roles and risk profiles so that you aren’t using large amounts of resources dedicated to each actor. In addition, starting and restarting threads takes time, and each has a default size of 512 KB. You will use up your memory very quickly in a system that relies primarily on actors created with PinnedDispatcher
.
The key to separating actors into failure zones is to identify their risk profile. Is a task particularly dangerous, such as network IO? Is it a task that requires blocking, such as database access? In those cases, you want to isolate those actors and their threads from those doing work that is less dangerous. If something happens to a thread that results in it completely dying and not being available from the pool, isolation is your only protection so that unrelated actors aren’t affected by the diminishment of resources. With the Typesafe Console, you can visualize the performance of your dispatchers so that you can be certain that you have properly provided “bulkheads” between actors doing blocking work and those that should not be affected.
You also may want to identify areas of heavy computation through profiling, and break those tasks out using tools such as routers. For those tasks that you assign to routers, you might also want them to operate on their own dispatcher so that the intense computation tasks do not starve other actors waiting for a thread to perform their work. With the Typesafe Console, you can visualize the isolation of work via actors and their dispatcher to be certain that the routers are effectively handling the workload.
Now the question becomes how to size your dispatchers, and this is where the Typesafe Console can be very handy. In systems where you have several or many dispatchers, keep in mind that the number of threads that can be run at any time on a box is a function of how many cores it has available. In the case of Intel boxes, where hyperthreading is available, you could think in terms of double the number of cores if you know that your application is less CPU-bound. I recommend sizing your thread pools close to the number of cores on the box where you plan to deploy your system and then running your system under a reasonable load and profile with the Typesafe Console. You can then externally configure the thread pool sizes and check the impact at runtime.
When using the Typesafe Console, watch the dispatcher’s view to see if the latency of message handling is within acceptable tolerances of your nonfunctional requirements, and if not, try adjusting the number of threads required upward. Remember, you’re setting the minimum number of threads, the maximum number of threads, and the parallelism-factor. This is the ceiling of the number of cores on your box multiplied by that factor is calculated to determine the thread pool size, bounded by the max and min settings you give.
The Typesafe Console also shows you something else that is very important to watch—the size of each actor’s mailbox. If you see an actor whose mailbox is perpetually increasing in size, you need to retune the threads for its dispatcher or parallelize its task by making it a router so that it has the resources it needs to keep up with the demands placed on it by the system. The receipt of messages into an actor’s mailbox can be bursty in nature, but you shouldn’t have actors with mailboxes that aren’t handling the traffic coming to them fast enough to keep up with the load.
Once you have an idea of the number of threads you need to handle burstiness in your application (if any), sit down with your team and determine the minimum and maximum bounds of each thread pool. Don’t be afraid to add a few extra threads to the max to account for possible thread death in production, but don’t go overboard.
Also, pay close attention to your throughput setting on your dispatcher. This defines thread distribution “fairness” in your dispatcher, telling the actors how many messages to handle in their mailboxes before relinquishing the thread so that other actors do not starve. However, a context switch in CPU caches is likely each time actors are assigned threads, and warmed caches are one of your biggest friends for high performance. It may behoove you to be less fair so that you can handle quite a few messages consecutively before releasing it.
There are a few edge cases. If you have a case where the number of threads is equal to the number of actors using the dispatcher, set the number extremely high, like 1,000. If your actors perform tasks that will take some time to complete, and you need fairness to avoid starvation of other actors sharing the pool, set the throughput to 1. For general usage, start with the default value of 5 and tune this value for each dispatcher so that you get reasonable performance characteristics without the risk of making actors wait too long to handle messages in their mailboxes.
3.144.100.245