Futures

Parallel collections offer a simple, yet powerful, framework for parallel operations. However, they are limited in one respect: the total amount of work must be known in advance, and each thread must perform the same function (possibly on different inputs).

Imagine that we want to write a program that fetches a web page (or queries a web API) every few seconds and extracts data for further processing from this web page. A typical example might involve querying a web API to maintain an up-to-date value of a particular stock price. Fetching data from an external web page takes a few hundred milliseconds, typically. If we perform this operation on the main thread, it will needlessly waste CPU cycles waiting for the web server to reply.

The solution is to wrap the code for fetching the web page in a future. A future is a one-element container containing the future result of a computation. When you create a future, the computation in it gets off-loaded to a different thread in order to avoid blocking the main thread. When the computation finishes, the result is written to the future and thus made accessible to the main thread.

As an example, we will write a program that queries the "Markit on demand" API to fetch the price of a given stock. For instance, the URL for the current price of a Google share is http://dev.markitondemand.com/MODApis/Api/v2/Quote?symbol=GOOG. Go ahead and paste this in the address box of your web browser. You will see an XML string appear with, among other things, the current stock price. Let's fetch this programmatically without resorting to a future first:

scala> import scala.io._
import scala.io_

scala> val url = "http://dev.markitondemand.com/MODApis/Api/v2/Quote?symbol=GOOG"
url: String = http://dev.markitondemand.com/MODApis/Api/v2/Quote?symbol=GOOG

scala> val response = Source.fromURL(url).mkString
response: String = <StockQuote><Status>SUCCESS</Status>
...

Notice how it takes a little bit of time to query the API. Let's now do the same, but using a future (don't worry about the imports for now, we will discuss what they mean in more detail further on):

scala> import scala.concurrent._
import scala.concurrent._

scala> import scala.util._
import scala.util._

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

scala> val response = Future { Source.fromURL(url).mkString }
response: Future[String] = Promise$DefaultPromise@3301801b

If you run this, you will notice that control returns to the shell instantly before the API has had a chance to respond. To make this evident, let's simulate a slow connection by adding a call to Thread.sleep:

scala> val response = Future { 
  Thread.sleep(10000) // sleep for 10s
  Source.fromURL(url).mkString
}
response: Future[String] = Promise$DefaultPromise@231f98ef

When you run this, you do not have to wait for ten seconds for the next prompt to appear: you regain control of the shell straightaway. The bit of code in the future is executed asynchronously: its execution is independent of the main program flow.

How do we retrieve the result of the computation? We note that response has type Future[String]. We can check whether the computation wrapped in the future has finished by querying the future's isCompleted attribute:

scala> response.isCompleted
Boolean = true

The future exposes a value attribute that contains the computation result:

scala> response.value
Option[Try[String]] = Some(Success(<StockQuote><Status>SUCCESS</Status>
...

The value attribute of a future has type Option[Try[T]]. We have already seen how to use the Try type to handle exceptions gracefully in the context of parallel collections. It is used in the same way here. A future's value attribute is None until the future is complete, then it is set to Some(Success(value)) if the future ran successfully, or Some(Failure(error)) if an exception was thrown.

Repeatedly calling f.value until the future completes works well in the shell, but it does not generalize to more complex programs. Instead, we want to tell the computer to do something once the future is complete: we want to bind a callback function to the future. We can do this by setting the future's onComplete attribute. Let's tell the future to print the API response when it completes:

scala> response.onComplete {
  case Success(s) => println(s)
  case Failure(e) => println(s"Error fetching page: $e")
}

scala> 
// Wait for response to complete, then prints:
<StockQuote><Status>SUCCESS</Status><Name>Alphabet Inc</Name><Symbol>GOOGL</Symbol><LastPrice>695.22</LastPrice><Chan...

The function passed to onComplete runs when the future is finished. It takes a single argument of type Try[T] containing the result of the future.

Tip

Failure is normal: how to build resilient applications

By wrapping the output of the code that it runs in a Try type, futures force the client code to consider the possibility that the code might fail. The client can isolate the effect of failure to avoid crashing the whole application. They might, for instance, log the exception. In the case of a web API query, they might add the offending URL to be queried again at a later date. In the case of a database failure, they might roll back the transaction.

By treating failure as a first-class citizen rather than through exceptional control flow bolted on at the end, we can build applications that are much more resilient.

Future composition – using a future's result

In the previous section, you learned about the onComplete method to bind a callback to a future. This is useful to cause a side effect to happen when the future is complete. It does not, however, let us transform the future's return value easily.

To carry on with our stocks example, let's imagine that we want to convert the query response from a string to an XML object. Let's start by including the scala-xml library as a dependency in build.sbt:

libraryDependencies += "org.scala-lang" % "scala-xml" % "2.11.0-M4"

Let's restart the console and reimport the dependencies on scala.concurrent._, scala.concurrent.ExecutionContext.Implicits.global, and scala.io._. We also want to import the XML library:

scala> import scala.xml.XML
import scala.xml.XML

We will use the same URL as in the previous section:

http://dev.markitondemand.com/MODApis/Api/v2/Quote?symbol=GOOG

It is sometimes useful to think of a future as a collection that either contains one element if a calculation has been successful, or zero elements if it has failed. For instance, if the web API has been queried successfully, our future contains a string representation of the response. Like other container types in Scala, futures support a map method that applies a function to the element contained in the future, returning a new future, and does nothing if the calculation in the future failed. But what does this mean in the context of a computation that might not be finished yet? The map method gets applied as soon as the future is complete, like the onComplete method.

We can use the future's map method to apply a transformation to the result of the future asynchronously. Let's poll the "Markit on demand" API again. This time, instead of printing the result, we will parse it as XML.

scala> val strResponse = Future { 
  Thread.sleep(20000) // Sleep for 20s
  val res = Source.fromURL(url).mkString
  println("finished fetching url")
  res
}
strResponse: Future[String] = Promise$DefaultPromise@1dda9bc8

scala> val xmlResponse = strResponse.map { s =>
  println("applying string to xml transformation")
  XML.loadString(s) 
}
xmlResponse: Future[xml.Elem] = Promise$DefaultPromise@25d1262a

// wait while the remainder of the 20s elapses
finished fetching url
applying string to xml transformation

scala> xmlResponse.value
Option[Try[xml.Elem]] = Some(Success(<StockQuote><Status>SUCCESS</Status>...

By registering subsequent maps on futures, we are providing a road map to the executor running the future for what to do.

If any of the steps fail, the failed Try instance containing the exception gets propagated instead:

scala> val strResponse = Future { 
  Source.fromURL("empty").mkString 
}

scala> val xmlResponse = strResponse.map { 
  s => XML.loadString(s) 
}

scala> xmlResponse.value 
Option[Try[xml.Elem]] = Some(Failure(MalformedURLException: no protocol: empty))

This behavior makes sense if you think of a failed future as an empty container. When applying a map to an empty list, it returns the same empty list. Similarly, when applying a map to an empty (failed) future, the empty future is returned.

Blocking until completion

The code for fetching stock prices works fine in the shell. However, if you paste it in a standalone program, you will notice that nothing gets printed and the program finishes straightaway. Let's look at a trivial example of this:

// BlockDemo.scala
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object BlockDemo extends App {
  val f = Future { Thread.sleep(10000) }
  f.onComplete { _ => println("future completed") }
  // "future completed" is not printed
}

The program stops running as soon as the main thread has completed its tasks, which, in this example, just involves creating the futures. In particular, the line "future completed" is never printed. If we want the main thread to wait for a future to execute, we must explicitly tell it to block execution until the future has finished running. This is done using the Await.ready or Await.result methods. Both these methods block the execution of the main thread until the future completes. We could make the above program work as intended by adding this line:

Await.ready(f, 1 minute)

The Await methods take the future as their first argument and a Duration object as the second. If the future takes longer to complete than the specified duration, a TimeoutException is thrown. Pass Duration.Inf to set an infinite timeout.

The difference between Await.ready and Await.result is that the latter returns the value inside the future. In particular, if the future resulted in an exception, that exception will get thrown. In contrast, Await.ready returns the future itself.

In general, one should try to avoid blocking as much as possible: the whole point of futures is to run code in background threads in order to keep the main thread of execution responsive. However, a common, legitimate use case for blocking is at the end of a program. If we are running a large-scale integration process, we might dispatch several futures to query web APIs, read from text files, or insert data into a database. Embedding the code in futures is more scalable than performing these operations sequentially. However, as the majority of the intensive work is running in background threads, we are left with many outstanding futures when the main thread completes. It makes sense, at this stage, to block until all the futures have completed.

Controlling parallel execution with execution contexts

Now that we know how to define futures, let's look at controlling how they run. In particular, you might want to control the number of threads to use when running a large number of futures.

When a future is defined, it is passed an execution context, either directly or implicitly. An execution context is an object that exposes an execute method that takes a block of code and runs it, possibly asynchronously. By changing the execution context, we can change the "backend" that runs the futures. We have already seen how to use execution contexts to control the execution of parallel collections.

So far, we have just been using the default execution context by importing scala.concurrent.ExecutionContext.Implicits.global. This is a fork / join thread pool with as many threads as there are underlying CPUs.

Let's now define a new execution context that uses sixteen threads:

scala> import java.util.concurrent.Executors
import java.util.concurrent.Executors

scala> val ec = ExecutionContext.fromExecutorService(
  Executors.newFixedThreadPool(16)
)
ec: ExecutionContextExecutorService = ExecutionContextImpl$$anon$1@1351ce60

Having defined the execution context, we can pass it explicitly to futures as they are defined:

scala> val f = Future { Thread.sleep(1000) } (ec)
f: Future[Unit] = Promise$DefaultPromise@458b456

Alternatively, we can define the execution context implicitly:

scala> implicit val context = ec
context: ExecutionContextExecutorService = ExecutionContextImpl$$anon$1@1351ce60

It is then passed as an implicit parameter to all new futures as they are constructed:

scala> val f = Future { Thread.sleep(1000) }
f: Future[Unit] = Promise$DefaultPromise@3c4b7755

You can shut the execution context down to destroy the thread pool:

scala> ec.shutdown()

When an execution context receives a shutdown command, it will finish executing its current tasks but will refuse any new tasks.

Futures example – stock price fetcher

Let's bring some of the concepts that we covered in this section together to build a command-line application that prompts the user for the name of a stock and fetches the value of that stock. The catch is that, to keep the UI responsive, we will fetch the stock using a future:

// StockPriceDemo.scala

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.io._
import scala.xml.XML
import scala.util._

object StockPriceDemo extends App {

  /* Construct URL for a stock symbol */
  def urlFor(stockSymbol:String) =
    ("http://dev.markitondemand.com/MODApis/Api/v2/Quote?" + 
     s"symbol=${stockSymbol}")

  /* Build a future that fetches the stock price */
  def fetchStockPrice(stockSymbol:String):Future[BigDecimal] = {
    val url = urlFor(stockSymbol)
    val strResponse = Future { Source.fromURL(url).mkString }
    val xmlResponse = strResponse.map { s => XML.loadString(s) }
    val price = xmlResponse.map { 
      r => BigDecimal((r  "LastPrice").text) 
    }
    price
  }

  /* Command line interface */
  println("Enter symbol at prompt.")
  while (true) {
    val symbol = readLine("> ") // Wait for user input
    // When user puts in symbol, fetch data in background
    // thread and print to screen when complete
    fetchStockPrice(symbol).onComplete { res =>
      println()
      res match {
        case Success(price) => println(s"$symbol: USD $price")
        case Failure(e) => println(s"Error fetching  $symbol: $e")
      }
      print("> ") // Simulate the appearance of a new prompt
    }
  }

}

Try running the program and entering the code for some stocks:

[info] Running StockPriceDemo
Enter symbol at prompt:
> GOOG
> MSFT
>
GOOG: USD 695.22
>
MSFT: USD 47.48
> AAPL
> 
AAPL: USD 111.01

Let's summarize how the code works. when you enter a stock, the main thread constructs a future that fetches the stock information from the API, converts it to XML, and extracts the price. We use (r "LastPrice").text to extract the text inside the LastPrice tag from the XML node r. We then convert the value to a big decimal. When the transformations are complete, the result is printed to screen by binding a callback through onComplete. Exception handling is handled naturally through our use of .map methods to handle transformations.

By wrapping the code for fetching a stock price in a future, we free up the main thread to just respond to the user. This means that the user interface does not get blocked if we have, for instance, a slow internet connection.

This example is somewhat artificial, but you could easily wrap much more complicated logic: stock prices could be written to a database and we could add additional commands to plot the stock price over time, for instance.

We have only scratched the surface of what futures can offer in this section. We will revisit futures in more detail when we look at polling web APIs in Chapter 7, Web APIs and Chapter 9, Concurrency with Akka.

Futures are a key part of the data scientist's toolkit for building scalable systems. Moving expensive computation (either in terms of CPU time or wall time) to background threads improves scalability greatly. For this reason, futures are an important part of many Scala libraries such as Akka and the Play framework.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.153.212