Chapter 4: Handling Faults and Actor Hierarchy

Failures happen. It's a simple fact of life. In the context of our systems, this might be anything from network outages to drive failures or even simple errors in your application logic. The key is that we have to assume these events will occur. If we don't, we are guilty of simply ignoring reality.

When we think about failures, whether they are caused by exceptions, environmental factors or whatever, the most likely way we have learned to think about them is in terms of containment. For example, take the typical try/catch block that seems to be ubiquitous in nearly every popular language right now in whatever form it takes. This is all about containing these errors and failures and making sure they don't cause the rest of our system to come crashing down.

But there's a big issue that you have perhaps encountered. When these failures occur, we need to understand what state is our system in at the point the error happened and what do we need to do to make sure it's in a known-good state. That's not always an easy question to answer, particularly when we consider how exceptions effect the control-flow of your application.

Typical failure handling

It's worthwhile to spend some time reviewing the mechanisms Scala has built in for handling failure cases before stepping back and looking at what Akka brings to the mix. Some of this will perhaps be familiar, given that these same techniques exist in many other languages. But Scala brings a couple of nice additions to the mix.

The first of these is exception handling using try/catch/finally. Here's a simple example to make sure the concept is clear:

 try {
val writer = new FileWriter("test.out")
writer.write(bigBlockOfData)
} catch {
case e: FileOutputException =>
println("Failed to write data.")
} finally {
writer.close()
}

In this example, we see a case statement for an exception type that we know might be encountered and a finally block that will be executed whether the catch block is executed or not.

Another recent addition to the arsenal is Try[T], which allows for executing code that might be expected to result in an exception being thrown. The classes Success[T] and Failure[T], both of which extend Try, are the concrete instances which our code will receive depending on the success of the code passed to it. In the case of an exception, the Failure instance returned will contain the exception that was thrown. Similarly, if no exception was thrown, the Success instance will contain the final result of the code that was evaluated.

One of the benefits of Try is that it includes the map and flatMap operations, so we can use it in for comprehensions, chaining together operations and proceeding only on a successful execution. In comparison to deeply nested try/catch blocks, it should be apparent how much simpler this can make the code. Another reason to consider Try is that it encodes a possibly failing operation in the type system rather than having to make the choice to explicitly handle exceptions wherever they might occur or to delegate them up the call stack.

As a basic example of using Try, here we are retrieving the HTML source for Google's home page (the final output is truncated, of course):

 scala> import scala.util.Try; import scala.io.Source
import scala.util.Try
import scala.io.Source

scala> for (g <- Try{Source.fromURL("http://google.com")})
| yield g.getLines.foreach(println)
<!doctype html><html...

Failures can also be handled using a simple match on the result:

 scala> import scala.util.{Failure, Success}
import scala.util.{Failure, Success}
scala> Try { Source.fromURL("http://imaginary.domain") } match {
| case Success(result) => //...
| case Failure(error) => println("Failed: " + error)
| }
Failed: java.net.UnknownHostException: imaginary.domain

There are more capabilities available for working with Try, which are covered in the documentation

Let it crash

The Akka engineering team often uses the catch-phrase let it crash (they actually borrowed it from the Erlang community, where it's sometimes phrased as let it fail). The idea is that failures should be accepted and handled appropriately. But how? Understanding the answer to this question is, in many ways, fundamental to truly understanding how to design with actors — at least, if the intent is to have the system be resilient to failures.

The primary idea to keep in mind is to isolate dangerous or risky tasks from other, important data. This is easy to do with actors.

For instance, consider an actor that needs to maintain some local representation of a value, perhaps a current trading price for a shares of commodities. But this actor needs to periodically request updates to the value from a remote service. Calling the remote is risky. If the service is providing a RESTful mechanism for retrieving the current value, it might have to deal with any of a number of possible error conditions: connection timeouts, HTTP failure code responses, improper data encodings, expired access credentials, etc.

The right way to approach this is to separate the remote request-making portion of this task into a new actor. Depending upon the overall structure of the system, this actor might be instantiated and managed by the data-caching actor, or it might be handled by some other actor that has more general responsibilities for handling these kinds of remote requests (for example, it may also handle encoding and decoding of the data formats involved). Either way, that request-making actor is a child of some other actor. The parent of that child is also it's supervisor.

Fail fast

Another important idea is that of failing fast. That is, if a failure occurs, it's usually best to make sure the failure is recognized and acted on immediately, but also to allow the actors to fail when they encounter problems. Following the principle of keeping actors small and single purpose (sometimes referred to as the single-responsibility principle), it makes sense to not include a huge amount of logic around handling failures, but rather to let the actor fail and then either restart or recreate it and try again. This is where the topic of supervision comes into play.

Supervision

Supervisors are a key concept to understand and master in Akka. A poor understanding of them will almost certainly lead to unexpected behavior in the actor system. This might result in data disappearing that was expected to appear, requests to remote systems that shouldn't have occurred, or any of a number of other oddities. 

Any actor in the system has at the very least a parent actor. At the topmost level are the special guardian actors mentioned earlier in the book (more details on these shortly). It's best to create one or more top-level actors that will then in any non-trivial system likely create additional actors as children. These children may even, in turn, create additional actors as needed. This tree of actors forms the actor hierarchy of the system.

Each actor acts as a supervisor for its children. What this means is that any unhandled errors that occur within the children are handled by the parent within a special structure called a SupervisorStrategy. The SupervisorStrategy for a given actor is one of two types: either a OneForOneStrategy or an AllForOneStrategy. The difference between these two types is what happens to the other child actors of the supervisor. In the case of the OneForOneStrategy only the failing actor has the response of the strategy applied. With an AllForOneStrategy, all of the failing actors sibling actors are also affected. OneForOneStrategy is very often the best choice, unless the collection of actors are closely interdependent in some way that requires action from all of them.

The supervisor strategy's main purpose is to take an the error that occured and translate it into an appropriate course of action, which is one of the following:

  • Resume — the actor should simply resume its operations, keeping all internal state
  • Restart — the actor should be restarted, resetting any internal state
  • Stop — the actor should simply be stopped
  • Escalate — the error should be escalated to the parent of the supervisor

No single one of these can be applied across the board to every case, so we need to determine which applies at any given time. Note that resume should only be used when it is certain that the code can continue without issue in its current internal state. Since restarts are such a common scenario, but necessarily need to be handled specially to avoid cascading failures, these strategies also take two initial parameters: the number of times an actor is allowed to restart and the window of time in which that count is applied. To be precise, if the number of restarts is set to a maximum of 5 in a 60 second window and the actor has already restarted 5 times in that 60 second window, the actor will simply be terminated if another failure occurs.

Let's look at an example of defining a simple strategy:

 import akka.actor.Actor
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

case class ExpectedHiccup(m: String) extends Exception(m)
case class RemoteSystemFault(m: String) extends Exception(m)
class SimpleSupervisor extends Actor {
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 60 seconds) {
case _: ExpectedHiccup => Resume
case _: RemoteSystemFault => Restart
}

def receive = {
case _ => // do nothing
}
}

It's important to point out what's not in this strategy: quite a bit. There are certainly many other possible exceptions that might occur — it's impossible to say which they might be. In these unhandled cases, those errors are escalated to this actor's supervisor.

Another important point is that, in the case where no strategy is defined, Akka will use the default strategy. This is actually defined as one of two system strategies: SupervisorStrategy.defaultStrategy and SupervisorStrategy.stoppingStrategy.

  • In the default strategy, the following cases are handled: an ActorInitializationException, which is thrown when an actor's initialization logic fails, or an ActorKilledException, which is thrown when an actor receives an akka.actor.Kill message. Both result in the actor being stopped; any other Exception instance will cause the actor to be restarted, and any other instance of Throwable will be escalated.
  • The SupervisorStrategy.stoppingStrategy will simply stop any failing child actor. Note that any grand-child actors or below that will also be stopped.

Both of these pre-defined strategies are of type OneForOneStrategy, with no limits specified for the maximum number of restarts and no window defined. Given that, it's good to think about how failures will be handled there is no defined supervisor. This can easily result in a system spiraling out of control, given that a generic exception will cause the actor to get restarted. If this exception occurs every time the actor is executed, it will be spinning in-place with potentially disasterous results.

The actor-lifecycle

While this applies to more than just failure handling, it's worth briefly discussing the lifecycle of an actor in Akka. As we've already seen, actors typically begin life with a call to actorOf(Props[SomeActor]). Akka starts the actor when it is first created, and as with any other object in Scala, initialization code can be placed within the constructor (the body of the class definition). We can also insert code to be run immediately before the actor is actually started, but after the object has been created, using the preStart hook. A common pattern with actors is to have the actor send itself a message when it starts to let it know to initiate some process (for instance, scheduling work using the scheduler, described in Appendix B). Akka also provides the ability to perform cleanup, as necessary, using the postStop hook. It's important to note that, at this point, the actor's mailbox will be unavailable.

 import akka.actor.Actor 

case object Initialize

class SelfInitializingActor extends Actor {
override def preStart {
self ! Initialize
}

override def postStop {
// perform some cleanup or just log the state of things
}

def receive = {
case Initialize => {
// perform necessary initialization
}
}
}

The most important hooks, though, when it comes to handling failures within an actor, are provided to allow for handling of additional tasks needed when restarts occur. The preRestart and postRestart methods both get passed the Throwable instance that caused the restart. In the preRestart case Akka also passes the most recent message from the actors message queue that caused the exception to occur. Note that postRestart normally calls the preStart method, so if overriding postRestart, we will need to call that directly (or call super.postRestart with the same parameters) if our code depends on that hook, as well, particularly if we are depending on preStart for initialization.

You will typically use these restart hooks to handle cleanup chores in failure scenarios. A good example would be when there is some interdependent resource that needs to know when the actor is available:

 import akka.actor.{ Actor, ActorRef } 

case class Available(ref: ActorRef)
case class Unavailable(ref: ActorRef)

class CodependentActor(dep: ActorRef) extends Actor {
override def preStart {
dep ! Available(self)
}

override def preRestart(reason: Throwable,
message: Option[Any]) {
dep ! Unavailable(self)
}

// this overriden implementation is not really
// needed, but it's here to show you the form
override def postRestart(reason: Throwable) {
preStart()
}
}

The other key mechanism available as part of the whole actor lifecycle system is the so-called DeathWatch, which provides a means to be notified when a particular actor has been stopped permanently (that is, restarts don't count). In order to make use of this, an actor registers its interest in being notified by calling context.watch on a reference to the target actor. When that target actor is shut down, the DeathWatch sends a Terminated message, which includes the ActorRef for the deceased actor. It's also possible to receive multiple Terminated messages for a single actor. This mechanism is very useful when you need to have the failure of one actor trigger some other action, but be sure to use it carefully.

import akka.actor.{Actor, Props, Terminated} 

case class Register(ref: ActorRef) 
class MorbidActor extends Actor {
  def receive = {
    case Register(ref) => context.watch(ref)
    case Terminated(ref) =>
  }
}

Understanding the actor lifecycle is an important factor for designing robust actor systems. The dependencies between the components of an actor system should be built in such a way that the lifecycle of each individual component is considered as part of the overall picture.

A bit more about the hierarchy

It's worth talking a bit more about the actor hierarchy and what, in particular, sits above the top-most actors. There are three special actors known as guardians, which are internal to Akka. The one most often seen reference to is the user guardian. The user guardian is handles is responsible for handling any errors that percolate up through the actor hierarchy and which are not handled by any explicit supervisors lower in the tree. It normally implements default strategy described above, but that can be overridden as of Akka 2.1 by overriding the setting akka.actor.guardian-supervisor-strategy. To specify a different strategy, set this to the fully-qualified pathname of a class that implements akka.actor.SupervisorStrategyConfiguration. Since this is rarely needed, so we will leave this as an exercise for the reader.

The other guardians to be aware of are the system and the root guardians. The system guardian is responsible for certain system-level services that need to be available before any user-level actors are started and that need to still be running after any user-level actors are shutdown. One instance of this would be the logging actors that reside under the system guardian. The order of startup and shutdown of the guardians in Akka provide this feature. The startup order is root, followed by the system, followed by user. The inverse is used for shutdown.

The root guardian resides at the very top of the actor hierarchy. Its primary purpose is to handle faults that escalate from the user or system guardians and a few other special root-level actors, with the default action being to stop those children. This guardian, being an actor, still needs a supervisor and it has one: a special pseudo-actor that will stop the child (root guardian) on any failure that reaches this level. Before the root guardian is stopped, the hierarchy is walked, recursively stopping all children, after which the isTerminated flag of the ActorSystem is set to true. The Akka team refers to this supervisor as "the one who walks the bubbles of space-time" — this is a reference to the recursive nature of the supervisor hierarchy and the fact that this special supervisor needs to exist outside of the "bubble of space-time" of the ActorSystem in order to avoid the issue of infinite recursion.

Guidelines for handling failures

Now that we've seen how Akka addresses exceptions and errors, we can cover some general principles that are good to follow. These guidelines won't fit every scenario, but they are appropriate to use as a goal and careful thought should be given when straying from this path.

We've already discussed the let it crash philosophy. But this idea is important enough to reinforce: the system should be designed to allow for failure by isolating failure-prone operations in distinct actors. This allows for a lot of flexibility when dealing with and accomodating these failures. As we'll see when we cover routers, this approach also pairs well with pooled actors. When a single actor in the pool fails, we can still quickly retry the operation, getting either another actor from the pool or possibly even the same actor after a restart.

One technique for isolating failures is to use what's know as the error kernel pattern. In this pattern, you typically create a new actor to handle some unit of work that has a potential failure case that should be guarded against, while allowing other work to continue. An example of this in action follows:

 import akka.actor.{OneForOneStrategy, Props, Actor}
import akka.actor.SupervisorStrategy.{Escalate, Restart}
import scala.concurrent.duration._
case class Payload()
case class CompletedWork()
case class UnableToPerformWorkException() extends Exception()
class ErrorKernelExample extends Actor {

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 15 seconds) {
case _: UnableToPerformWorkException => Restart
case _: Exception ⇒ Escalate
}

def receive = {
case work: Payload ⇒
context.actorOf(Props[Worker]) forward work
}
}
class Worker extends Actor() {
def receive = {
case work: Payload ⇒
sender ! doSomework(work)
context.stop(self)
}
def doSomework(work: Payload) = {
// process the work and then return a success message
CompletedWork
}
}
}

In this example, the ErrorKernelExample actor is using a custom SupervisorStrategy that will restart the child actor when it throws an UnableToPerformWorkException on up to five occurences within a 15 second interval. The actor itself waits for the Payload message, which indicates that some work needs to be performed, and creates an instance of the Worker actor, forwarding the Payload message to it. By forwarding the message, the child actor is then able to reply to the original sender who requested the work be performed directly when it has completed successfully. Structuring the work this way allows the ErrorKernelExample to remain available for further work even in the case of error occuring while performing the work. However, in this case, we wanted to know to finally give up when the failure rate is too high, hence the settings given for the strategy.

The key idea with the error kernel pattern is that we are localizing the specifc failure handling where it makes sense, near the failure site, while allowing critical failures that might require more broadly scoped action to escalate as necessary. A typical example of this would be interaction with some external system, such as a database or webservice. In a normal operation you would expect some amount of failures interacting with these systems, and those failures should be isolated using the error kernel pattern or similar localized supervisors.

We should also consider dedicated supervisors in some cases, as an additional means of isolation. A typical approach is to have a single supervisor for actor role in our system. This is really just another form of the error kernel pattern shown above, but using a slightly different approach. Instead of creating anonymous actors to isolate dangerous work, we create normal, non-anonymous actors and create special actors to be their parents, but whose sole purpose is to supervise its children. We'll see an example of this in the context of our earlier example from the second chapter.

First, we'll create a new actor to supervise our BookmarkStore actors. This is a very simplified example, but it should provide an idea of how we can approach this:

 import akka.actor.{ActorRef, OneForOneStrategy, Actor}
import akka.actor.SupervisorStrategy.Restart
import scala.concurrent.duration._
class BookmarkStoreGuardian(database: Database[Bookmark, UUID]) extends Actor {

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 30 seconds) {
case _: Exception => Restart
}

val bookmarkStore =
context.actorOf(Props(classOf[BookmarkStore], database).
withRouter(RoundRobinRouter(nrOfInstances = 10)))

def receive = {
case msg => bookmarkStore forward msg
}
}

This actor simply forwards the messages on to the BookmarkStore actors which it places behind a RoundRobinRouter (we'll see more of in the next chapter). The overriden supervisorStrategy is very simple here. As with the example used in Chapter 2, it simply restarts the actors on any failure until it has exceeded 5 failures within 30 seconds. If those limits are exceeded, the failures will be escalated (in this case, as we'll see below, up to the top of our ActorSystem, resulting in the system shutting down).

Here's the revised form of our Bookmarker application that initializes all of this.

 import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
import java.util.UUID
import akka.actor.{Props, ActorSystem}
import akka.routing.RoundRobinRouter

object Bookmarker extends App {

val system = ActorSystem("bookmarker")

val database = Database.connect[Bookmark, UUID]("bookmarkDatabase")

val bookmarkStoreGuardian =
system.actorOf(Props(classOf[BookmarkStoreGuardian], database))

val server = new Server(8080)
val root = new ServletContextHandler(ServletContextHandler.SESSIONS)
root.addServlet(new ServletHolder(new BookmarkServlet(system, bookmarkStoreGuardian)), "/")

server.setHandler(root)
server.start
server.join
}

The only significant change here is the addition of the code that starts our BookmarkStoreGuardian. Take note of how we pass this guardian actor to our servlet instead of the previous routed pool of BookmarkStore actors. Since the guardian is simply forwarding messages down to the underlying actors, this works without having to change our servlet code at all.

Wrap-up

Akka gives us powerful techniques for fault handling, but it also requires designing with failure in mind. But that's something we should be doing anyway. In the next chapter, we'll begin looking at additional structures for handling the flow of messages and allocation of work in our system using routers and dispatchers.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.42.128