Defining actors

An actor in Akka must extend traits of the same name and implement the receive method:

class Cook extends Actor {  
override def receive = {
case _ =>
}
}

The type of the receive action is defined as type Receive = PartialFunction[Any, Unit]which closely resembles the abstract definition of an actor model. In Akka, actors can receive any message, and any of the actor's activities are manifested as a change in its state or as a side-effect.

Our defined actor accepts any message and does nothing. This is probably the simplest and laziest actor possible.

In order to make it useful, let's define its behavior and vocabulary.

As we're building an enterprise bakery, our actors will have a single responsibility, which is also a good idea in any kind of system, not just one that's actor-based. Our cook actor will take a ready-made dough and make raw cookies out of it. First, we must define the messages:

object Cook {
final case class Dough(weight: Int)
final case class RawCookies(number: Int)
}

And then the behavior of the actor:

class Cook extends Actor {
override def receive = {
case Dough(weight) =>
val numberOfCookies = makeCookies(weight)
sender() ! RawCookies(numberOfCookies)
}

private val cookieWeight = 30
private def makeCookies(weight: Int):Int = weight / cookieWeight
}

There are a couple of things going on in this definition:

  1. Our actor understands only one message type, Dough
  2. It makes raw cookies out of dough by calculating their number
  3. We're using sender() to obtain a reference to the actor from which the received message is originating and send the response to this reference

Also, some more subtle details are worth mentioning:

  • If, by coincidence, our Cook will get any other message except Dough, this message won't be handled by the actor and will be lost. Akka has a special mechanism for such messages called dead message queue
  • We defined case classes for each message type to make understanding the code less complicated.
  • The actor's logic is decoupled from the protocol and can be extracted into the companion object. This is done to make testing easier.

Now that we have a definition of the actor, it is time to instantiate it and send some messages to it. In Akka, we have to use a special Prop constructor:

val cookProps: ActorRef = Props[Cook]

We don't need to pass any constructor parameters to the actor in this case and therefore we can benefit from using the Props form, which takes a sole type of an actor as its parameter.

Don't construct actors directly using a class constructor. It is possible to do so and then obtain the ActorRef from the actor, but this will result in an error at runtime.

Now, we can bring everything together and send out our first message:

object Bakery extends App {
val bakery = ActorSystem("Bakery")
val cook: ActorRef = bakery.actorOf(Props[Cook], "Cook")
cook ! Dough(1000)
}

Here, we have created a named actor using an actor system and sent one message to it.

Let's define a couple of other actors to make our bakery lively. We will separate the responsibilities, as follows:

  • A boy will visit the groceries store and get the necessary ingredients.
  • He will hand them over to the bakery manager so that they can check the amount and quality.
  • The ingredients are then given to the chef, who prepares a dough out of them.
  • The chef uses one or more mixers depending on the volume of the dough.
  • The ready dough is given to the cook, who prepares the raw cookies.
  • The raw cookies are baked by the baker.
  • The baker uses the oven for baking. It might take a couple of rounds to bake all of the cookies because of the oven's limited size.
  • The cookies are returned to the manager as soon as they are ready.

Then, we need to figure out the relationships between actors, like so:

And then we need to build and show the message flow among them:

We'll build our actor hierarchy from the bottom up, starting from the oven:

object Oven {
final case object Extract
final case class Cookies(count: Int)
def props(size: Int) = Props(classOf[Oven], size)
}

class Oven(size: Int) extends Actor {
private var cookiesInside = 0
override def receive = LoggingReceive {
case RawCookies(count) => insert(count).foreach(sender().!)
case Extract => sender() ! Cookies(extract())
}

def insert(count: Int): Option[RawCookies] =
if (cookiesInside > 0) {
Some(RawCookies(count))
} else {
val tooMany = math.max(0, count - size)
cookiesInside = math.min(size, count)
Some(tooMany).filter(_ > 0).map(RawCookies)
}

def extract(): Int = {
val cookies = cookiesInside
cookiesInside = 0
cookies
}
}

We introduced a number of new features here.

First, we defined two new message types that will be used to command an oven to return cookies and make a container for the ready cookies. In the actor itself, we're using a constructor parameter to specify how many cookies will fit inside it. We're also using Akka's LoggingReceive, which writes incoming messages into the log. In the receive method itself, we stick to the principle of separating Akka semantics from the business logic.

The insert method checks whether the oven is empty and places as many raw cookies as possible into it, optionally returning these which don't fit inside back, so that we can forward them to the sender.

In the extract method, we modify the number of cookies inside the oven and return them to the sender.

Having var inside of an actor is absolutely safe and illustrates one of the core features of Akka features—messages are processed by actors one-by-one in the order they are received. Even in a highly concurrent environment, Akka shields actor code from any concurrency-related matters.

Always use deeply immutable messages. Using mutable structures will allow two different actors to access the same data from different threads, which can lead to concurrent modifications and corrupt data.

To instantiate an oven, we'll use another flavor of the Prop constructor, which allows us to define constructor parameters:

val prop = Props(classOf[Oven], size)

By convention, it is placed into the companion object of an actor. The size of the oven is defined here as well.

As shown in the following code, we'll describe the user of the oven, that is, the Baker actor:

object Baker {
import scala.concurrent.duration._
private val defaultBakingTime = 2.seconds
def props(oven: ActorRef) = Props(new Baker(oven))
}
class Baker(oven: ActorRef,
bakingTime: FiniteDuration = Baker.defaultBakingTime)
extends Actor {
private var queue = 0
private var timer: Option[Cancellable] = None
override def receive: Receive = {
case RawCookies(count) =>
queue += count
if (sender() != oven && timer.isEmpty) timer = sendToOven()
case c: Cookies =>
context.actorSelection("../Manager") ! c
assert(timer.isEmpty)
if (queue > 0) timer = sendToOven() else timer = None
}
private def sendToOven() = {
oven ! RawCookies(queue)
queue = 0
import context.dispatcher
Option(context.system.scheduler.scheduleOnce(bakingTime, oven, Extract))
}
override def postStop(): Unit = {
timer.foreach(_.cancel())
super.postStop()
}
}

Let's take a closer look at what is going on here. First, we need to use yet another kind of Props constructor because Akka does not support constructors with default parameters.

Props, along with the instance, is a very powerful construct that allows you to create anonymous actors which in turn can close over the internal state of another actor. Try to avoid using it, if possible.

The Baker actor receives an ActorRef of the Oven as a parameter. This reference is used by the baker to send cookies to the Oven and extract them.

After receiving the baked cookies from the Oven, the Baker looks up the Manager actor and sends the Cookies to it. After that, it puts another batch of raw cookies into the Oven, if needed. We will discuss the intrinsics of context.actorSelection later in this chapter.

The Baker maintains an internal queue of raw cookies and periodically puts them into the oven. This is an old oven, and to use it, we need set up a kitchen timer in order to extract the baked cookies at the proper time. Finally, we include a postStop life cycle hook for the timer in order to cancel it if our actor stops. We have done this because, if the actor is no longer there, there will be no one around to listen out for the timer signal.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.104.153