Concurrency and exception handling with futures

While the program that we wrote in the previous section works, it is very brittle. It will crash if we enter a non-existent user name or the GitHub API changes or returns a badly-formatted response. We need to make it fault-tolerant.

What if we also wanted to fetch multiple users? The program, as written, is entirely single-threaded. The fetchUserFromUrl method fires a call to the API and blocks until the API sends data back. A better solution would be to fetch multiple users in parallel.

As you learned in Chapter 4, Parallel Collections and Futures, there are two straightforward ways to implement both fault tolerance and parallel execution: we can either put all the user names in a parallel collection and wrap the code for fetching and extracting the user in a Try block or we can wrap each query in a future.

When querying web APIs, it is sometimes the case that a request can take abnormally long. To prevent this from blocking the other threads, it is preferable to rely on futures rather than parallel collections for concurrency, as we saw in the Parallel collection or Future? section at the end of Chapter 4, Parallel Collections and Futures.

Let's rewrite the code from the previous section to handle fetching multiple users concurrently in a fault-tolerant manner. We will change the fetchUserFromUrl method to query the API asynchronously. This is not terribly different from Chapter 4, Parallel Collections and Futures, in which we queried the "Markit on demand" API:

// GitHubUserConcurrent.scala

import scala.io._
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.util._

import org.json4s._
import org.json4s.native.JsonMethods._

object GitHubUserConcurrent {

  implicit val formats = DefaultFormats

  case class User(id:Long, userName:String)

  // Fetch and extract the `User` corresponding to `url`
  def fetchUserFromUrl(url:String):Future[User] = {
    val response = Future { Source.fromURL(url).mkString }
    val parsedResponse = response.map { r => parse(r) }
    parsedResponse.map { extractUser }
  }

  // Helper method for extracting a user from a JObject
  def extractUser(jsonResponse:JValue):User = {
    val o = jsonResponse.transformField {
      case ("login", name) => ("userName", name)
    }
    o.extract[User]
  }

  def main(args:Array[String]) {
    val names = args.toList

    // Loop over each username and send a request to the API 
    // for that user 
    val name2User = for {
      name <- names
      url = s"https://api.github.com/users/$name"
      user = fetchUserFromUrl(url)
    } yield name -> user

    // callback function
    name2User.foreach { case(name, user) =>
      user.onComplete {
        case Success(u) => println(s" ** Extracted for $name: $u")
        case Failure(e) => println(s" ** Error fetching $name:$e")
      }
    }

    // Block until all the calls have finished.
    Await.ready(Future.sequence(name2User.map { _._2 }), 1 minute)
  }
}

Let's run the code through sbt:

$ sbt
> runMain GitHubUserConcurrent odersky derekwyatt not-a-user-675
 ** Error fetching user not-a-user-675: java.io.FileNotFoundException: https://api.github.com/users/not-a-user-675
 ** Extracted for odersky: User(795990,odersky)
 ** Extracted for derekwyatt: User(62324,derekwyatt)

The code itself should be straightforward. All the concepts used here have been explored in this chapter or in Chapter 4, Parallel Collections and Futures, apart from the last line:

Await.ready(Future.sequence(name2User.map { _._2 }), 1 minute)

This statement tells the program to wait until all futures in our list have been completed. Await.ready(..., 1 minute) takes a future as its first argument and blocks execution until this future returns. The second argument is a time-out on this future. The only catch is that we need to pass a single future to Await rather than a list of futures. We can use Future.sequence to merge a collection of futures into a single future. This future will be completed when all the futures in the sequence have completed.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.116.80.45