Fetcher actors

The workhorse of our application is the fetcher, the actor responsible for fetching the follower details from GitHub. In the first instance, our actor will accept a single message, Fetch(user). It will fetch the followers corresponding to user and log the response to screen. We will use the recipes developed in Chapter 7, Web APIs, to query the GitHub API with an OAuth token. We will inject the token through the actor constructor.

Let's start with the companion object. This will contain the definition of the Fetch(user) message and two factory methods to create the Props instances. You can find the code examples for this section in the chap09/fetchers_alone directory in the sample code provided with this book (https://github.com/pbugnion/s4ds):

// Fetcher.scala
import akka.actor._
import scalaj.http._
import scala.concurrent.Future

object Fetcher {
  // message definitions
  case class Fetch(login:String)

  // Props factory definitions
  def props(token:Option[String]):Props = 
    Props(classOf[Fetcher], token)
  def props():Props = Props(classOf[Fetcher], None)
}

Let's now define the fetcher itself. We will wrap the call to the GitHub API in a future. This avoids a single slow request blocking the actor. When our actor receives a Fetch request, it wraps this request into a future, sends it off, and can then process the next message. Let's go ahead and implement our actor:

// Fetcher.scala
class Fetcher(val token:Option[String])
extends Actor with ActorLogging {
  import Fetcher._ // import message definition

  // We will need an execution context for the future.
  // Recall that the dispatcher doubles up as execution
  // context.
  import context.dispatcher

  def receive = {
    case Fetch(login) => fetchUrl(login)
  }

  private def fetchUrl(login:String) {
    val unauthorizedRequest = Http(
      s"https://api.github.com/users/$login/followers")
    val authorizedRequest = token.map { t =>
      unauthorizedRequest.header("Authorization", s"token $t")
    }

    // Prepare the request: try to use the authorized request
    // if a token was given, and fall back on an unauthorized 
    // request
    val request = authorizedRequest.getOrElse(unauthorizedRequest)

    // Fetch from github
    val response = Future { request.asString }
    response.onComplete { r =>
      log.info(s"Response from $login: $r")
    }
  }

}

Let's instantiate an actor system and four fetchers to check whether our actor is working as expected. We will read the GitHub token from the environment, as described in Chapter 7, Web APIs, then create four actors and ask each one to fetch the followers of a particular GitHub user. We wait five seconds for the requests to get completed, and then shut the system down:

// FetcherDemo.scala
import akka.actor._

object FetcherDemo extends App {
  import Fetcher._ // Import the messages

  val system = ActorSystem("fetchers")

  // Read the github token if present.
  val token = sys.env.get("GHTOKEN")

  val fetchers = (0 until 4).map { i =>
    system.actorOf(Fetcher.props(token))
  }

  fetchers(0) ! Fetch("odersky")
  fetchers(1) ! Fetch("derekwyatt")
  fetchers(2) ! Fetch("rkuhn")
  fetchers(3) ! Fetch("tototoshi")

  Thread.sleep(5000) // Wait for API calls to finish
  system.shutdown // Shut system down

}

Let's run the code through SBT:

$ GHTOKEN="2502761..." sbt run
[INFO] [11/08/2015 16:28:06.500] [fetchers-akka.actor.default-dispatcher-2] [akka://fetchers/user/$d] Response from tototoshi: Success(HttpResponse([{"login":"akr4","id":10892,"avatar_url":"https://avatars.githubusercontent.com/u/10892?v=3","gravatar_id":""...

Notice how we explicitly need to shut the actor system down using system.shutdown. The program hangs until the system is shut down. However, shutting down the system will stop all the actors, so we need to make sure that they have finished working. We do this by inserting a call to Thread.sleep.

Using Thread.sleep to wait until the API calls have finished to shut down the actor system is a little crude. A better approach could be to let the actors signal back to the system that they have completed their task. We will see examples of this pattern later when we implement the fetcher manager actor.

Akka includes a feature-rich scheduler to schedule events. We can use the scheduler to replace the call to Thread.sleep by scheduling a system shutdown five seconds in the future. This is preferable as the scheduler does not block the calling thread, unlike Thread.sleep. To use the scheduler, we need to import a global execution context and the duration module:

// FetcherDemoWithScheduler.scala

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

We can then schedule a system shutdown by replacing our call to Thread.sleep with the following:

system.scheduler.scheduleOnce(5.seconds) { system.shutdown }

Besides scheduleOnce, the scheduler also exposes a schedule method that lets you schedule events to happen regularly (every two seconds, for instance). This is useful for heartbeat checks or monitoring systems. For more information, read the API documentation on the scheduler available at http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html.

Note that we are actually cheating a little bit here by not fetching every follower. The response to the follower's query is actually paginated, so we would need to fetch several pages to fetch all the followers. Adding logic to the actor to do this is not terribly complicated. We will ignore this for now and assume that users are capped at 100 followers each.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.85.221