Life-cycle hooks

Akka lets us specify code that runs in response to specific events in an actor's life, through life-cycle hooks. Akka defines the following hooks:

  • preStart(): This runs after the actor's constructor has finished but before it starts processing messages. This is useful to run initialization code that depends on the actor being fully constructed.
  • postStop(): This runs when the actor dies after it has stopped processing messages. This is useful to run cleanup code before terminating the actor.
  • preRestart(reason: Throwable, message: Option[Any]): This is called just after an actor receives an order to restart. The preRestart method has access to the exception that was thrown and to the offending message, allowing for corrective action. The default behavior of preRestart is to stop each child and then call postStop.
  • postRestart(reason:Throwable): This is called after an actor has restarted. The default behavior is to call preStart().

Let's use system hooks to persist the state of FetcherManager between runs of the programs. You can find the code examples for this section in the chap09/ghub_crawler_fault_tolerant directory in the sample code provided with this book (https://github.com/pbugnion/s4ds). This will make the fetcher manager fault-tolerant. We will use postStop to write the current queue and set of visited users to text files and preStart to read these text files from the disk. Let's start by importing the libraries necessary to read and write files:

// FetcherManager.scala

import scala.io.Source 
import scala.util._
import java.io._

We will store the names of the two text files in which we persist the state in the FetcherManager companion object (a better approach would be to store them in a configuration file):

// FetcherManager.scala
object FetcherManager {
  ...
  val fetchedUsersFileName = "fetched-users.txt"
  val fetchQueueFileName = "fetch-queue.txt"
}

In the preStart method, we load both the set of fetched users and the backlog of users to fetch from the text files, and in the postStop method, we overwrite these files with the new values of these data structures:

class FetcherManager(
  val token:Option[String], val nFetchers:Int
) extends Actor with ActorLogging {

  ...

  /** pre-start method: load saved state from text files */
  override def preStart {
    log.info("Running pre-start on fetcher manager")

    loadFetchedUsers
    log.info(
      s"Read ${fetchedUsers.size} visited users from source"
    )

    loadFetchQueue
    log.info(
      s"Read ${fetchQueue.size} users in queue from source"
    )

    // If the saved state contains a non-empty queue, 
    // alert the fetchers so they can start working.
    if (fetchQueue.nonEmpty) {
      context.become(receiveWhileNotEmpty)
      fetchers.foreach { _ ! Fetcher.WorkAvailable }
    }

  }

  /** Dump the current state of the manager */
  override def postStop {
    log.info("Running post-stop on fetcher manager")
    saveFetchedUsers
    saveFetchQueue
  }

     /* Helper methods to load from and write to files */
  def loadFetchedUsers {
    val fetchedUsersSource = Try { 
      Source.fromFile(fetchedUsersFileName) 
    }
    fetchedUsersSource.foreach { s =>
      try s.getLines.foreach { l => fetchedUsers += l }
      finally s.close
    }
  }

  def loadFetchQueue {
    val fetchQueueSource = Try { 
      Source.fromFile(fetchQueueFileName) 
    }
    fetchQueueSource.foreach { s =>
      try s.getLines.foreach { l => fetchQueue += l }
      finally s.close
    }
  }

  def saveFetchedUsers {
    val fetchedUsersFile = new File(fetchedUsersFileName)
    val writer = new BufferedWriter(
      new FileWriter(fetchedUsersFile))
    fetchedUsers.foreach { user => writer.write(user + "
") }
    writer.close()
  }

  def saveFetchQueue {
    val queueUsersFile = new File(fetchQueueFileName)
    val writer = new BufferedWriter(
      new FileWriter(queueUsersFile))
    fetchQueue.foreach { user => writer.write(user + "
") }
    writer.close()
  }

...
}

Now that we save the state of the crawler when it shuts down, we can put a better termination condition for the program than simply interrupting the program once we get bored. In production, we might halt the crawler when we have enough names in a database, for instance. In this example, we will simply let the crawler run for 30 seconds and then shut it down.

Let's modify the main method:

// FetchNetwork.scala
import akka.actor._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object FetchNetwork extends App {

  // Get token if exists
  val token = sys.env.get("GHTOKEN")

  val system = ActorSystem("GithubFetcher")
  val manager = system.actorOf(FetcherManager.props(token, 2))

  manager ! FetcherManager.AddToQueue("odersky")

  system.scheduler.scheduleOnce(30.seconds) { system.shutdown }

}

After 30 seconds, we just call system.shutdown, which stops all the actors recursively. This will stop the fetcher manager, calling the postStop life cycle hook. After one run of the program, I have 2,164 names in the fetched-users.txt file. Running it again increases this number to 3,728 users.

We could improve fault tolerance further by making the fetcher manager dump the data structures at regular intervals while the code runs. As writing to the disk (or to a database) carries a certain element of risk (What if the database server goes down or the disk is full?) it would be better to delegate writing the data structures to a custom actor rather than endangering the manager.

Our crawler has one minor problem: when the fetcher manager stops, it stops the fetcher actors, response interpreter, and follower extractor. However, none of the users currently going through these actors are stored. This also results in a small number of undelivered messages at the end of the code: if the response interpreter stops before a fetcher, the fetcher will try to deliver to a non-existent actor. This only accounts for a small number of users. To recover these login names, we can create a reaper actor whose job is to coordinate the killing of all the worker actors in the correct order and harvest their internal state. This pattern is documented in a blog post by Derek Wyatt (http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.24.30