The ask pattern

With the Chef, we will introduce another popular pattern in Akka—the ask pattern:

class Chef extends Actor with ActorLogging with Stash {
import scala.concurrent.duration._
private implicit val timeout = Timeout(5 seconds)

override def receive = {
case Groceries(eggs, flour, sugar, chocolate) =>
for (i <- 1 to eggs) {
val mixer = context.watch(context.actorOf(Mixer.props, s"Mixer_$i"))
val message = Groceries(1, flour / eggs, sugar / eggs, chocolate / eggs)
import akka.pattern.ask
val job = (mixer ? message).mapTo[Dough]
import context.dispatcher
import akka.pattern.pipe
job.pipeTo(sender())
}
log.info("Sent jobs to {} mixers", eggs)
context.become(waitingForResults, discardOld = false)
}
def waitingForResults: Receive = {
case g: Groceries => stash()
case Terminated(child) =>
if (context.children.isEmpty) {
unstashAll()
context.unbecome()
log.info("Ready to accept new mixing jobs")
}
}
}

There are lots of things happening here, so let's go over the code line by line and describe what's going on.

сlass Chef extends Actor with ActorLogging with Stash

Our Chef actor is not only an Actor—but it also extends ActorLogging and Stash. The ActorLogging trait gives an actor a predefined logger.  It is also possible to define the Logger directly, for example, as shown in the following code: 

val log = akka.event.Logging(this)

Akka uses a special message-based logging facility internally to minimize blocking inside of an actor. 

Akka logging supports SLF4J as a backend. The official documentation (https://doc.akka.io/docs/akka/2.5/logging.html) has a detailed explanation on how to extend the configuration to enable SLF4J logging into an Akka application:

  import scala.concurrent.duration._
private implicit val timeout = Timeout(5 seconds)

Here, we have defined a timeout of 5 seconds, which will be necessary the moment we start working with mixers:

override def receive = {
case Groceries(eggs, flour, sugar, chocolate) =>

In the receive method, our actor only accepts Groceries messages and uses pattern matching to extract field values:

for (i <- 1 to eggs) {
val message = Groceries(1, flour / eggs, sugar / eggs, chocolate / eggs)

Our mixers are small, so we need to split the groceries at hand into portions of one egg so that the portion fits into the mixer:

val mixer = context.watch(context.actorOf(Mixer.props, s"Mixer_$i"))

Here, we created a Mixer actor using the props defined earlier (which in turn assigns the proper dispatcher to it) and named it appropriately.

In the following two lines of code, we can see implicit ask magic at work:

import akka.pattern.ask
val job: Future[Dough] = mixer ? message

Having to ask in scope allows us implicitly to convert an ActorRef into AskableActorRef, which is then used as a target for the message. The actor ? message syntax represents the ask pattern. Akka sends a message to the target actor and creates an expectation of the response as a Future[Any]. This Future can be worked with like any other Future. For convenience, Akka provides a mapTo[T] method, which allows you to convert it into the Future[T].

The final piece of code in the for comprehension uses another implicit conversion provided by Akka, this time acting on the Future:

import akka.pattern.pipe
import context.dispatcher
job.pipeTo(sender())

Here, we're bringing in scope a pipe which transforms the normal Future into the PipeableFuture. The latter can be piped into one or multiple actors, as shown in the third line of the preceding code, by using an implicit execution context that was imported in the second line.

The third line of code pipes the result of the Future execution to the sender in the case of it being a success.

We could use job.recoverWith to resend the job to the mixer if the first attempt fails. This is a simple way to implement "at least once" semantics using the ask pattern.

Having created all of the mixers and sent them work packages, the Chef actor writes a log entry and starts to wait for the results:

log.info("Sent jobs to {} mixers", eggs)
context.become(waitingForResults, discardOld = false)

There is a special syntax in Akka logging. The first argument is a String that incorporates {} placeholders to denote other arguments. The substitution is done in a separate thread, but only if the respective log level is enabled. This is done to minimize the logging work done by the actor's thread.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.12.156