Getting started with Akka

Akka is a toolkit to simplify writing concurrent and distributed applications, tasks that can be complex to achieve, as we described at the beginning of this chapter. As Akka is largely documented both by a number of books as well as extensive online documentation, our goal here is mostly to experiment with the technology. We will see how to elegantly write Scala code to solve problems that might otherwise be error-prone and hard to understand if written in more conventional ways such as thread synchronization and other languages such as Java. Akka is written in Scala, but provides to both Java and Scala APIs.

Understanding the Actor model

Akka relies on the Actor paradigm to create concurrent applications. The Actor model has already been introduced decades ago in the original paper of Carl Hewitt, Peter Bishop, and Richard Steiger entitled A Universal Modular Actor Formalism for Artificial Intelligence, 1973, IJCAI. Erlang is an example of language that has been made famous using this model of computation and achieved very good scalability and reliability figures (the well-known nine nines of availability).

Without going too much into details, we can say that the Actor model is a model based on message passing rather than method calls. Each unit of computation, called actor, encapsulates its own behavior and communicates with other actors through asynchronous immutable messages. It is quite straightforward to reason about actor systems since they mimic the way humans communicate, exchanging immutable messages between each other. Since the footprint of actors is very minimal compared to threads, and state is not shared, they are very suited to writing concurrent and distributed applications.

In the gold mine of Typesafe activator templates, a number of projects concerning Akka are available. Let's dig into a couple of them to better understand how to make programs using Akka actors. First, we can take a look at the hello-akka project to get an idea on how to run a simple actor.

If you haven't got the Typesafe activator in place, follow the instructions from Chapter 3, Understanding the Scala Ecosystem, to create the sample project associated with the hello-akka template. Once imported into Eclipse, we may start looking at the main class HelloAkkaScala.scala in the default package of the Scala src directory.

It starts with the following lines (skipping the imports):

case object Greet
case class WhoToGreet(who: String)
case class Greeting(message: String)

class Greeter extends Actor {
  var greeting = ""

  def receive = {
    case WhoToGreet(who) => greeting = s"hello, $who"
    case Greet           => sender ! Greeting(greeting) 
    // Send the current greeting back to the sender
  }
}

As you see, defining an actor consists of extending an Actor trait and requires only implementing the abstract receive method. This method represents the actor's behavior when it receives a message. It does not need to handle all types of messages, which is why it is a partial function.

The declared mutable variable, greeting, shows that you can safely add some mutable state to your actor since the processing of the receive method is single threaded.

It is convenient to model the immutable messages sent between actors as case classes, and the Greeter actor uses the two messages, Greet and WhoToGreet(who:String). Whenever the Greeter actor receives a WhoToGreet(who) message, it simply updates its state but does not reply anything. In contrast, when this same actor receives a Greet message, it uses the saved state to answer the actor that sent the message. The ! method is also called tell (which, by the way, is the name used in the Akka Java API) and represents the sending of a message to an actor, with the signature actor ! message.

Also, note the presence of the sender variable that is made implicitly available as part of the Actor trait, since it is a common pattern that an actor replies to the sender. However, we could have added an ActorRef argument to the Greet message containing the address of the receiver instead, that is, declared a case Greet(someone:ActorRef) class and implemented the processing of Greet, as follows:

def receive = {
  ...
  case Greet(someone) => someone ! Greeting(greeting)
}

The HelloAkkaScala object defines the main routine, as shown in the following code snippet:

object HelloAkkaScala extends App {

  // Create the 'helloakka' actor system
  val system = ActorSystem("helloakka")

  // Create the 'greeter' actor
  val greeter = system.actorOf(Props[Greeter], "greeter")

  // Create an "actor-in-a-box"
  val inbox = Inbox.create(system)

  // Tell the 'greeter' to change its 'greeting' message
  greeter.tell(WhoToGreet("akka"), ActorRef.noSender)

  // Ask the 'greeter for the latest 'greeting'
  // Reply should go to the "actor-in-a-box"
  inbox.send(greeter, Greet)

  // Wait 5 seconds for the reply with the 'greeting' message
  val Greeting(message1) = inbox.receive(5.seconds)
  println(s"Greeting: $message1")

  // Change the greeting and ask for it again
  greeter.tell(WhoToGreet("typesafe"), ActorRef.noSender)
  inbox.send(greeter, Greet)
  val Greeting(message2) = inbox.receive(5.seconds)
  println(s"Greeting: $message2")
  val greetPrinter = system.actorOf(Props[GreetPrinter])
  // after zero seconds, send a Greet message every second to the greeter with a sender of the greetPrinter
  system.scheduler.schedule(0.seconds, 1.second, greeter, Greet)(system.dispatcher, greetPrinter)
}

A system running actors needs a runtime environment; this is what the system variable declares. Creating an actor consists of invoking the system.actorOf method with a configuration argument as well as an optional name. This method gives you back an ActorRef (actor reference) object, which is the actor address, that is, where messages can be sent. An ActorRef object is an immutable and serializable handle to an actor, which may or may not reside on the local host or within the same ActorSystem object. As actors only communicate through messages in an asynchronous fashion, each actor has a mailbox where messages can be enqueued if the actor cannot handle them as quickly as they arrive.

The remaining part of the main routine essentially sends orders in the form of Greet or WhoToGreet messages to the Greeter actor. These messages are sent from an Inbox object that also expects answers. This Inbox object, also referred to as "actor-in-a-box", is a convenient way to write code outside actors that will communicate with actors. Finally, the last actor, greetPrinter, sends Greet messages (that are scheduled by the environment) to the Greeter actor repetitively every second.

You can execute the example code within the project by running the command > ./activator run and choosing the [2] HelloAkkaScala program. You should see something as is shown in the following code:

Multiple main classes detected, select one to run:

 [1] HelloAkkaJava
 [2] HelloAkkaScala

Enter number: 2

[info] Running HelloAkkaScala 
Greeting: hello, akka
Greeting: hello, typesafe
hello, typesafe
hello, typesafe
hello, typesafe
… [press CTRL-C to interrupt]

Switching behavior

Actors have the ability to switch their behavior before handling the next message. To illustrate this, let's consider an example of a travel agent actor that needs to reserve both a seat in a flight and a hotel room for its customer. The travel agent is responsible for making sure the booking is transactional, that is, it is only successful if both transport and accommodation are booked, which is illustrated in the following figure:

Switching behavior

As it is a recognized best practice to declare the messages concerning an actor into its companion object, we will express a Flight actor in the following way:

package se.sfjd.ch8

import akka.actor.Actor
import akka.event.LoggingReceive

object Flight {
  case class BookSeat(number:Int) {
    require(number > 0)
  }
  case object Done
  case object Failed
}
class Flight extends Actor {
  import Flight._
  var seatsLeft = 50
  def receive = LoggingReceive {
    case BookSeat(nb) if nb <= seatsLeft =>
      seatsLeft -= nb
      sender ! Done
    case _ => sender ! Failed
  }
}

Notice the require assertion found in the BookSeat message declaration. This method is part of Predef, a global object that includes many useful functionalities imported by default. It enables to do some design-by-contract style specification by checking pre- and post-conditions on methods. The receive method of the Flight actor is handling one type of message only, BookSeat(n:Int), which means reserving n seats as long as there are enough seats left for the flight. The Flight actor updates its state and replies with a Done message to the sender if there are enough seats left; it replies Failed otherwise.

Notice the LoggingReceive class that surrounds the block handling the actor messages. It is part of the akka.event package and is a convenient way of logging information that reaches this block. We will see later on, while executing the sample code, what these messages look like.

In a similar manner, a Hotel actor that takes care of reserving a room for n persons can be written as follows:

object Hotel {
  case class BookRoom(number:Int) {
    require(number > 0)
  }
  case object Done
  case object Failed
}

class Hotel extends Actor {
  import Hotel._
  var roomsLeft = 15
  def receive = LoggingReceive {
    case BookRoom(nb) if nb <= roomsLeft =>
      roomsLeft -= nb
      sender ! Done
    case _ => sender ! Failed
  }
}

The travel agent actor is the one that is going to switch its behavior. Once it has sent orders to book plane seats and hotel rooms for a number of people, it will successively change state while expecting answers. Since the messages sent to both Flight and Hotel are asynchronous, that is, nonblocking, we do not know which answer will come back first. Furthermore, answers might not come back at all as there is no guarantee at this point that the messages have been delivered or correctly handled. The code for the TravelAgent actor is given as follows:

object TravelAgent {
  case class BookTrip(transport: ActorRef, accomodation: ActorRef, nbOfPersons: Int)
  case object Done
  case object Failed
}
class TravelAgent extends Actor {
  import TravelAgent._
  
  def receive = LoggingReceive {
    case BookTrip(flightAgent, hotelAgent, persons) =>
      flightAgent ! Flight.BookSeat(persons)
      hotelAgent ! Hotel.BookRoom(persons)
      context.become(awaitTransportOrAccomodation(flightAgent, hotelAgent,sender))
  }
  
  def awaitTransportOrAccomodation(transport: ActorRef, accomodation: ActorRef, customer:ActorRef): Receive = LoggingReceive {
    case Flight.Done =>
      context.become(awaitAccomodation(customer))
    case Hotel.Done =>
      context.become(awaitTransport(customer))
    case Flight.Failed | Hotel.Failed =>
      customer ! Failed
      context.stop(self) 
  }
  
  def awaitTransport(customer: ActorRef): Receive = LoggingReceive {
    case Flight.Done =>
      customer ! Done
      context.stop(self)
    case Flight.Failed => 
      customer ! Failed
      context.stop(self)
  }
  
  def awaitAccomodation(customer: ActorRef): Receive = LoggingReceive {
    case Hotel.Done =>
      customer ! Done
      context.stop(self)
    case Hotel.Failed =>
      customer ! Failed
      context.stop(self)
  }
}

The invocation context.become(<new behavior method>) switches the behavior of the actor. In the case of this simple travel agent, the behavior will be switched to the expected messages that can be received in any order from the Flight and Hotel actors, respectively. If a successful answer is received from either the Flight or Hotel actors, the TravelAgent actor will switch its behavior to expect only the remaining answer.

Now, we only need a main routine to create our initial actors and initiate communication with the TravelAgent actor, as exhibited in the following code:

package se.sfjd.ch8

import akka.actor.Actor
import akka.actor.Props
import akka.event.LoggingReceive

class BookingMain extends Actor {
  val flight = context.actorOf(Props[Flight], "Stockholm-Nassau")
  val hotel = context.actorOf(Props[Hotel], "Atlantis")
  val travelAgent = context.actorOf(Props[TravelAgent], "ClubMed")
  travelAgent ! TravelAgent.BookTrip(flight,hotel,10)
    
  def receive = LoggingReceive {
      case TravelAgent.Done =>
        println("Booking Successful")
        context.stop(self)
      case TravelAgent.Failed =>
        println("Booking Failed")
        context.stop(self)
  }
}

Once the four actor classes involved in the use case have been written in Eclipse, running the program can be done by running an Eclipse configuration. Navigate to Run | Run Configuration… and edit a new Java Application configuration window knowing that the main class to run is the akka.Main class of the Akka runtime, as specified in the following screenshot:

Switching behavior

The actual main routine we want to run is passed as an argument. To do that, edit the Arguments tab of the same window, as shown in the following screenshot:

Switching behavior

For the debug messages produced by the LoggingReceive object to be active, you need to add the VM arguments as specified in the previous screenshot. Clicking on the Run button will execute the BookingMain class within the Akka runtime environment and display the following flow of messages:

Switching behavior

If you want to test an alternative scenario, for example, to see the booking failed while reserving the hotel, just put a higher number of persons, that is, 20 in travelAgent ! TravelAgent.BookTrip(flight,hotel,20), instead of 10.

Supervising actors to handle failure

In applications that are running actors concurrently, there might sometimes be exceptions that are thrown and those make an actor die eventually. As other actors are still running, it might be difficult to notice partial failures. In traditional architectures, where an object calls methods on other objects, the caller is the one receiving the exception. Since it usually blocks waiting for a response, it is also the one responsible to handle the failure. With actors, as all messages are being handled asynchronously without knowing the time it will take before receiving an answer (if any), the context in regards to the sent messages is usually not around anymore to handle the failure; so, it might be more difficult to react on an exception. In any case, something must be done about the failing actor for the application to function properly as its whole.

This is why Akka embraces the "let it crash" philosophy by providing support to monitor and eventually restart an actor or a group of dependent actors. As actors are normally created by other actors, they can be organized as hierarchies where an actor's parent is also its supervisor. Handling partial failure, therefore, consists of defining some strategies to restart part of the actor hierarchy simultaneously, depending on the situation.

If we go back to our small travel booking application, we can refactor the TravelAgent actor to be the supervisor of the Flight and Hotel booking actors. Therefore, we can declare the following supervisor strategy within the TravelAgent class:

override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
    case _: Flight.FlightBookingException =>
      log.warning("Flight Service Failed. Restarting")
      Restart
    case _: Hotel.HotelBookingException =>
      log.warning("Hotel Service Failed. Restarting")
      Restart
    case e =>
      log.error("Unexpected failure", e.getMessage)
      Stop
  }

The two possible strategies are OneForOneStrategy and AllForOneStrategy. In the first case, each child of the supervisor will be handled separately, whereas in the second case, all children of the given supervisor will be handled simultaneously.

The Flight companion object now contains an additional message that reflects the failure, as shown in the following code:

object Flight {
  case class BookSeat(number:Int) {
    require(number > 0)
  }
  case object Done
  case object Failed
  class FlightBookingException extends Exception("Unavailable Flight Booking Service")
}

To simulate the fact that booking the seats on a flight might fail at times, we can introduce the following method call when handling the receive method of the Flight actor, as shown in the following code snippet:

class Flight extends Actor {
  import Flight._
  var seatsLeft = 50
  def receive = LoggingReceive {
    case BookSeat(nb) if nb <= seatsLeft =>
      unreliable()
      seatsLeft -= nb
      sender ! Done
    case _ => sender ! Failed
  }
 
private def unreliable(): Unit =
    // the service is only working 75 percent of the time
    if (ThreadLocalRandom.current().nextDouble() < 0.25)
      throw new FlightBookingException
}

Relaunching the booking scenario with the Run configuration will display (since the failure happens only 25 percent of the time) the failing message at times, as shown in the following lines:


[WARN] [01/24/2014 00:23:50.098] [Main-akka.actor.default-dispatcher-3] [akka://Main/user/app/ClubMed] Flight Service Failed. Restarting

For interested readers who want to elaborate more on the topic of supervision, there is a complete and consistent sample called akka-supervision that is part of the activator templates. It demonstrates the computation of arithmetic expressions, so that nodes that represent subparts of the total computation may fail and be restarted.

Testing actor systems

Because of their nondeterministic nature, concurrent systems require some special care when testing them in contrast to traditional single-threaded architectures. Actor systems are no exceptions; the messages being sent and received asynchronously, there are multiple paths a program flow can follow. Fortunately, Akka provides a lot of support defined in the akka-testkit module for dealing with tests.

In Chapter 4, Testing Tools, we have already covered a number of examples involving the scalatest framework by looking at the test-patterns-scala activator template project. It contains a basic use case regarding the testing of Akka actors through the testkit module. You can reimport this template project into Eclipse or just open it if it is still in the IDE. The Test09.scala file exhibits the usage of a testing actor by providing an ImplicitSender trait that fakes the sending of messages to two actors under test. The first actor under test is a simple echo actor, whereas the second is calling a location service asynchronously that calculates the latitude and longitude of a given address. The syntax of the GeoActor object, given in the following test, should look familiar since it uses the dispatch libraries in the same way as we have seen in Chapter 3, Understanding the Scala Ecosystem:

package scalatest

import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.TestKit
import org.scalatest.WordSpecLike
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.testkit.ImplicitSender

//http://doc.akka.io/docs/akka/snapshot/scala/testing.html
object Setup {
  class EchoActor extends Actor {
    def receive = {
      case x => sender ! x
    }
  }

  case class Address(street: String,
                      city: String,
                      state: String,
                      zip: String)
//see https://developers.google.com/maps/documentation/geocoding/#Limits
class GeoActor extends Actor {
  def receive = {
    case Address(street,city,state,zip) => {
      import dispatch._, Defaults._
      val svc = url(s"http://maps.googleapis.com/maps/api/geocode/xml?address=${street},${city},${state},${zip}&sensor=true".replace(" ","+"))
      val response = Http(svc OK as.xml.Elem)
      val lat = (for {
        elem <- response() \ "geometry"  "location"  "lat"
      } yield elem.text).head
      val lng = (for {
        elem <- response() \ "geometry"  "location"  "lng"
      } yield elem.text).head
      sender ! s"${lat},${lng}"
    }
    case _ => sender ! "none"
  }
 }
}

In the main routine of the test case, we mix in the ImplicitSender trait and then invoke the expectMsg method :

class Test09(asys: ActorSystem) extends TestKit(asys) with ImplicitSender with WordSpecLike with MustMatchers with BeforeAndAfterAll {
  import Setup._
  def this() = this(ActorSystem("Setup"))

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An Echo actor" must {
    "return messages" in {
      val echo = system.actorOf(Props[EchoActor])
      echo ! "hello world"
      expectMsg("hello world")
    }
  }
  
  "Geo actor" must {
    "send back lat,lon" in {
      val geo = system.actorOf(Props[GeoActor])
      geo ! Address("27 South Park Avenue","San Francisco","CA","94107")
      expectMsg("37.7822991,-122.3930776")
      }
    }
  }

The expectMsg() method has the role of an assertion that takes duration as a parameter, so that it does not wait forever for the reply to come back. Instead, it will throw an exception if the specified time has passed and it has not yet received the answer it was waiting for.

Exploring further with Akka

In addition to the useful functionalities of actor messaging and supervision, Akka includes support for many other, more advanced features. Among them are the following:

  • It monitors the lifecycle of actors through the DeathWatch API.
  • It persists actor state for recovery after failure.
  • It remotes with actors, that is, communicates with actors in a distributed environment and in a transparent way.
  • It clusters to handle failure in a distributed environment. A sample of the clustering features is also available as the akka-clustering activator template.

These features are out of the scope of this book, but they are extensively documented on the Akka site and available at http://akka.io/docs/.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.222.164.141