Follower network crawler

We are now ready to code up the remaining pieces of our network crawler. The largest missing piece is the fetcher manager. Let's start with the companion object. As with the worker actors, this just contains the definitions of the messages that the actor can receive and a factory to create the Props instance:

// FetcherManager.scala
import scala.collection.mutable
import akka.actor._

object FetcherManager {
  case class AddToQueue(login:String)
  case object GiveMeWork

  def props(token:Option[String], nFetchers:Int) = 
    Props(classOf[FetcherManager], token, nFetchers)
}

The manager can receive two messages: AddToQueue, which tells it to add a username to the queue of users whose followers need to be fetched, and GiveMeWork, emitted by the fetchers when they are unemployed.

The manager will be responsible for launching the fetchers, response interpreter, and follower extractor, as well as maintaining an internal queue of usernames and a set of usernames that we have seen:

// FetcherManager.scala

class FetcherManager(val token:Option[String], val nFetchers:Int) 
extends Actor with ActorLogging {

  import FetcherManager._

  // queue of usernames whose followers we need to fetch
  val fetchQueue = mutable.Queue.empty[String]

  // set of users we have already fetched. 
  val fetchedUsers = mutable.Set.empty[String]

  // Instantiate worker actors
  val followerExtractor = context.actorOf(
    FollowerExtractor.props(self))
  val responseInterpreter = context.actorOf(
    ResponseInterpreter.props(followerExtractor))
  val fetchers = (0 until nFetchers).map { i =>
    context.actorOf(
      Fetcher.props(token, self, responseInterpreter))
  }

  // receive method when the actor has work:
  // If we receive additional work, we just push it onto the
  // queue.
  // If we receive a request for work from a Fetcher,
  // we pop an item off the queue. If that leaves the 
  // queue empty, we transition to the 'receiveWhileEmpty'
  // method.
  def receiveWhileNotEmpty:Receive = {
    case AddToQueue(login) => queueIfNotFetched(login)
    case GiveMeWork =>
      val login = fetchQueue.dequeue
      // send a Fetch message back to the sender.
      // we can use the `sender` method to reply to a message
      sender ! Fetcher.Fetch(login)
      if (fetchQueue.isEmpty) { 
        context.become(receiveWhileEmpty) 
      }
  }

  // receive method when the actor has no work:
  // if we receive work, we add it onto the queue, transition
  // to a state where we have work, and notify the fetchers
  // that work is available.
  def receiveWhileEmpty:Receive = {
    case AddToQueue(login) =>
      queueIfNotFetched(login)
      context.become(receiveWhileNotEmpty)
      fetchers.foreach { _ ! Fetcher.WorkAvailable }
    case GiveMeWork => // do nothing
  }

  // Start with an empty queue.
  def receive = receiveWhileEmpty

  def queueIfNotFetched(login:String) {
    if (! fetchedUsers(login)) {
      log.info(s"Pushing $login onto queue") 
      // or do something useful...
      fetchQueue += login
      fetchedUsers += login
    }
  }
}

We now have a fetcher manager. The rest of the code can remain the same, apart from the follower extractor. Instead of logging followers names, it must send AddToQueue messages to the manager. We will pass a reference to the manager at construction time:

// FollowerExtractor.scala
import akka.actor._
import org.json4s._
import org.json4s.native.JsonMethods._

object FollowerExtractor {
  
  // messages
  case class Extract(login:String, jsonResponse:JArray)

  // props factory method
  def props(manager:ActorRef) = 
    Props(classOf[FollowerExtractor], manager)
}

class FollowerExtractor(manager:ActorRef)
extends Actor with ActorLogging {
  import FollowerExtractor._
  
  def receive = {
    case Extract(login, followerArray) =>
      val followers = extractFollowers(followerArray)
      followers foreach { f => 
        manager ! FetcherManager.AddToQueue(f) 
      }
  }

  def extractFollowers(followerArray:JArray) = for {
    JObject(follower) <- followerArray
    JField("login", JString(login)) <- follower
  } yield login

}

The main method running all this is remarkably simple as all the code to instantiate actors has been moved to the FetcherManager. We just need to instantiate the manager and give it the first node in the network, and it will do the rest:

// FetchNetwork.scala
import akka.actor._

object FetchNetwork extends App {

  // Get token if exists
  val token = sys.env.get("GHTOKEN")

  val system = ActorSystem("GithubFetcher")
  val manager = system.actorOf(FetcherManager.props(token, 2))
  manager ! FetcherManager.AddToQueue("odersky")

}

Notice how we do not attempt to shut down the actor system anymore. We will just let it run, crawling the network, until we stop it or hit the authentication limit. Let's run this through SBT:

$ GHTOKEN="2502761d..." sbt "runMain FetchNetwork"
[INFO] [11/06/2015 06:31:04.614] [GithubFetcher-akka.actor.default-dispatcher-2] [akka://GithubFetcher/user/$a] Pushing odersky onto queue
[INFO] [11/06/2015 06:31:05.563] [GithubFetcher-akka.actor.default-dispatcher-4] [akka://GithubFetcher/user/$a] Pushing misto onto queueINFO] [11/06/2015 06:31:05.563] [GithubFetcher-akka.actor.default-dispatcher-4] [akka://GithubFetcher/user/$a] Pushing gkossakowski onto queue
^C

Our program does not actually do anything useful with the followers that it retrieves besides logging them. We could replace the log.info call to, for instance, store the nodes in a database or draw the graph to screen.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.152.198