Chapter 8: Diving Deeper into Futures

In the second chapter I walked you through some code that used futures, but I didn't spend much time on them. I'd like to develop the idea a bit further here and show you how they can be used effectively to interact with actors and to sequence operations (or, computations, speaking more formally).

Clarifying our definition

Earlier I defined futures very generally, but let's be a bit more specific. A future represents the value of some expression or computation that may not have yet completed or which is not yet known. This mechanism was first proposed as a model for obtaining the result of parallel evaluation of expressions in a programming language.

As you saw earlier, futures are useful when you need to get a response back from an actor. This is precisely the scenario described in the definition of futures: we want to send a message off to an actor and get a response, but we don't know when or whether that response will come back. Our future gives us a handle by which to get that response.

While the most obvious use of futures in the context of Akka is for receiving responses from actors when outside of an actor context, you can also simply create futures directly. This is useful if you have a block of code you'd like to be run asynchronously from the rest of your code. Yes, you could create an actor to do this, but that requires a lot more work when you have access to futures essentially for free. Here, in the simplest form, is how you can execute a block of code as a future:

 import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global

future { 1 + 2 }

Of course, this example is not really complete — we're not yet doing anything with the result.

Here's a more complete example, again using the global ExecutionContext (more on this topic momentarily). I also added a type annotation to demonstrate the type of the future being created:

 import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global

val calculatingFuture: Future[Int] = future { 5 * 7 }

calculatingFuture.onComplete {
case Success(r) ⇒ println(r)
case Failure(t) ⇒
println("Oops! Something went wrong. An error occured: " + t.getMessage)
}

Execution Context

As I hinted at earlier, an ExecutionContext is an abstraction for something that provides a way to execute computations. For example, it might represent a variable-sized thread pool that scales appropriately depending on the number of CPU cores available.

There are a variety of ways to get provide or get ahold of an existing ExecutionContext. If you're familiar with the java.util.concurrent API, you know that ExecutorService is very similar to what I described. Conveniently, you can use one via the ExecutionContext.fromExecutorService method, passing in the ExecutorService you want to use. You should keep in mind that should you use an ExecutorService instance, you will still need to shut it down yourself — this can be done in the same place you shutdown your ActorSystem (for example, as I showed in the servlet example earlier, you might do this by overriding the servlet's destroy method).

As I showed you earlier, there's also a default, global ExecutionContext available by importing scala.concurrent.ExecutionContext.Implicits.global. And if you're running within an environment using Akka, as you likely will be at some point, you can use the system or actor context local dispatcher (more about this later in the book), but using one of the following forms:

 class MyApp {
val system = ActorSystem("MySystem")
import system.dispatcher
}

This form should only be used when you are not within an actual actor, but where you do have access to an ActorSystem instance. The next form would be used within an actor:

 class MyActor extends Actor {
import context.dispatcher
}

Choosing the appropriate ExecutionContext is important. If there is one already available because you are running within an existing ActorSystem, it's usually reasonable to go ahead and use that. The global ExecutionContext is really just a convenience and should not be overused given its global nature, which means it could be shared by a large number of different tasks, without you necessarily realizing it. Using your own ExecutorService is a good way to get very fine grained control of the execution environment, but you should be sure you have a good understanding of the java.util.concurrent facilities if you're going to go down this route.

For comprehensions

The futures defined within Scala's standard library define a set of very useful methods that you may have already seen on a number of other classes in Scala. These methods are map, flatMap, filter, and foreach.

These methods collectively allow the Scala compiler to provide a bit of syntactic-sugar in the form of for-comprehensions. Syntactic-sugar, if you're not familiar with the term, is a mechanism of the compiler that transforms one form of syntax into another, equivalent form. Generally, this is used to make something that's somewhat cumbersome into a nicer form.

First, let's just look at a very simple example of using map.

 import akka.dispatch._ 
def asyncAction: Future[String] = {
// ...
}
val delayedResponse = asyncAction()
// only called if the future returns a successful result
val transformedResponse: Future[Int] = delayedResponse.map {
response => response.length
}

I included a few unnecessary type annotations here to help clarify what's happening. Of particular note is the response type of map. It will always return another future. But the type parameter of that future may be different from the intial future if you perform some kind of transformation within the map call.

Another important thing to realize is that the map block will not actually be called until the initial future itself actually completes successfully. If the initial future or the future returned by the call to map throws an exception, the final future returned from map will be a Failure, containing whatever exception caused it to fail.

flatMap is very similar to map. What differentiates the two is that flatMap actually expects the code called within its block to return a Future, directly, as opposed to returning a value which is then wrapped in a new Future. Otherwise, the semantics are essentially the same, including the handling of exceptions, of course.

filter takes the successful result of a future and, based on predicate you pass to it, returns either the original value in another future or returns a Failure with a NoSuchElementException value. If an exception occurs either in the original future or in the handling of the pedicate, a Failure is also returned with the exception that caused the failure.

Let's look at how we can take advantage of the awesome sugared syntax we've been offered. Let's say you have requests to make to two different actors and you want to calculate some final value based on the results of calling both those actors. You can do this using a for comprehension, essentially composing the final result from the individual results:

 val itemId = 998
val buyersCurrency = "GBP"
val currentPrice: Future[Double] =
pricingActor ? GetPrice(itemId)
val conversionRate: Future[Double] =
conversionRateActor ? GetConversion(buyersCurrency)
val convertedPrice: Future[Double] =
for { price <- currentPrice;
rate <- conversionRate } yield {
currentPrice * conversionRate
}

You'll note that, again, I added some type annotations to make it clear what types are received by the calls. The final result of the for comprehension is still a future, which is important to recognize.

Also, notice that we're making the requests to the actors before the for comprehension. If the calls were made directly inside, we'd be effectively sequencing the actual calls. That is, one call would be made to pricingActor and then it would sit waiting for the result. Only then would the second call to conversionRateActor be made.

To understand that, it helps to see how this gets transformed by the Scala compiler. I won't explain all the rules here (feel free to go read the Scala language specification, if you're that curious), but essentially, for comprehension is turned into the following code.

 val convertedPrice: Future[Double] = currentPrice.flatMap {
case price => {
conversionRate.map {
case rate => {
currentPrice * conversionRate
}
}
}
}

Looking at the example in this form hopefully makes it more clear why you want to make your calls to get the futures prior to entering the for comprehension. To be clear, here's the equivalent code without the futures defined outside the for comprehension after it has been de-sugared by the compiler.

 val convertedPrice: Future[Double] = 
(pricingActor ? GetPrice(itemId)).flatMap {
case price => {
(conversionRateActor ? GetConversion(buyersCurrency)).map {
case rate => {
currentPrice * conversionRate
}
}
}
}

If you look closely at this form, you'll see how the GetConversion message is not sent to the conversionRateActor until after the price has already been returned by the pricingActor.

Sequencing futures

Sometimes you really need to take call a series of futures as a sequence of operations. There are a couple of approaches to this. One is to simply invoke the next future in the sequence in the callback of the current future you're code is waiting on. I won't even bother showing an example of this — it's cumbersome and quickly gets ugly. Another approach is to use the for comprehensions I just demonstrated. This works well, but in some cases it results in code that's not exactly fluid. Thankfully, the Scala 2.10 API has added a few new features to futures that make this a lot nicer.

We were just talking about sequencing futures and an extra mechanism is provided in the form of andThen. This method is useful when you want to do something as a side-effect on the completion of another future. The method takes a partial function where the incoming type is Try[_], so you can restrict the match on Success or Failure, or, if you don't care, ignore it entirely. The result of the original future is still returned as the final value. And, finally, you can chain together as many calls to andThen as you like.

 val f = future { 1 + 2 }
f andThen {
case Success(i) => println("The result is: " + i)
} andThen {
case _ => doSomethingElse() // don't care what we've got
}

There are cases where you want to make a series of calls and get the result from the first to return a result. In this situation, you can use Future.firstCompletedOf, which will handle as many futures as you want to give it and give you a future containing the result (whether a success or failure). This case can be very helpful in situations where you're calling some service that might timeout frequently.

 val tryOne, tryTwo, tryThree = future { makeSomeRequest() }
val first = Future.firstCompletedOf(Seq(tryOne, tryTwo, tryThree))

Finally, you can take a transform a sequence of futures into a future of sequences. Let's say you have a sequence of URLs that you want to retrieve and then process them when they're all done, but not before the entire sequence has completed. You could do something like this:

 import scala.io.Source

val urls = Seq("http://google.com", "http://twitter.com", "http://typesafe.com")
val pages = Seq.map { future { Source.fromURL(urls) } }
Future.sequence(pages).onComplete {
// process the Seq
}

Error handling and exceptions

The last, but most definitely not least, thing we need to cover is error handling with futures. I've already shown you how you can use onFailure as part of this, but that's just one tool available. We need to cover an additional observation about what we already know and then show a couple of helps that are available to you.

The first thing to discuss is how onFailure can be used in cases where it might not be so obvious. Although I've already shown how callbacks can be used with futures even when using other mechanisms for handling the results, such as for comprehensions, I haven't really commented on an important idea here. Notice how for comprehensions and many of the other structures the library provides still give you a final future to deal with. This outermost future can still have callbacks assigned to it:

 val firstFuture = someCallReturningAFuture()
val secondFuture = anotherCallReturningAFuture()
val thirdFuture = aFinalCallReturningAFuture()
val sequencedResult = for { first <- firstFuture
second <- secondFuture
third <- thirdFuture } yield {
first + second + third // we'll assume the results can all added together
}

sequencedResult.onFailure {
case e: DomainException => {
// do some cleanup here
}
}

This is not revolutionary, but again, it's important to recognize that after the for comprehension returns, we're still dealing with a future and any errors happening anywhere within the sequence of calls to each of those methods will bubble up to that final future. You can handle each of those individually and not use the for comprehension, but if you choose to go that route, it makes sense to handle the errors at the top.

Another common scenario you might run into occurs when you want to attempt to get some value as a future and, in the case of a failure, fall back to another future. This is provided by the appropriately named fallbackTo method, which simply takes a future as its argument:

 val firstTry = future { doSomethingQuestionable() }
val backup = future { doSomethingLessQuestionable() }
val result = firstTry fallbackTo backup

The other mechanisms you'll likely want to take a look at are recover and recoverWith. In the case of recover, the outgoing return type needs to be the same type as the original parameter type of the future you started with. In the case of recoverWith, it should be another future, but parameterized with the same type as the original future.

Here's a quick example of each:

 import scala.io.Source
import java.net.MalformedURLException

val candidateURLOne = "htt://google.xyz"
val candidateURLTwo = "http://google.com"
val getOne = future { Source.fromURL(candidateURLOne) }
val getTwo = future { Source.fromURL(candidateURLTwo) }

getOne.recover {
case MalformedURLException =>
Source.fromURL(candidateURLOne.replace("htt", "http"))
}.recoverWith {
case UnknownHostException =>
getTwo
}

As you can see, recover allows us to try a new operation, without it needing to be itself a future. The second approach, using recoverWith, is useful when you have an operation that could take time to perform and an alternate fallback approach that should be applied when the first doesn't work.

Note the difference between fallbackTo, which I showed you a little while ago, and recoverWith that I just demonstrated. fallbackTo takes a future and returns it on any failure of the original future, while recoverWith allows you to match on error returned by the original exception and return a suitable response based on it.

Handling actor responses

I titled this section in a way that implies it's all about the response we get back from sending messages to actors. But the fact is that this applies to any futures you might be using. This might feel like we're rehashing old ground, but there's actually a lot more to handling results from futures than you might think. We've already seen a bit of this previously, but let's go over some of the subtleties here to make sure we understand what's going on.

An additional method, mentioned briefly in the second chapter, for getting the result from a Future is to use one of the callbacks provided. Of these, onComplete is basically the catch-all. It lets you get the result whether the call resulted in a success or a failure. As you can probably deduce, onSuccess and onFailure handle the individual cases of success and failure.

There are a couple of interesting behavioral details to know about before you start using these:

  • You can create an arbitrary number of callbacks on any future.
  • These callbacks will not be called in a specific order. To put it another way, do not assume the callbacks will be called in the order you have defined them.
  • If any given callback throws an exception, the other callbacks will still be called.

Now that you understand the ground-rules, let's look at an example:

 import akka.actor._
import akka.patterns.ask
import akka.actor.Status.Failure

case class Message(msg: String)
case class Fail(msg: String)

val system = new ActorSystem("callbacks")
val responder = system.actorOf(Props(new Actor {
def receive = {
case Message(msg) => sender ! msg
// note: we need to respond with a Failure here!
case Fail(msg) => sender ! Failure(new Exception(msg))
}
}))

val responseOne = responder ? Message("will succeed")
val responseTwo = responder ? Fail("will fail")
responseOne.onComplete {
case Success(result) => println(result)
case Failure(failure) => println("Oops! Something went wrong: " + failure)
}
responseTwo.onSuccess {
case msg => println("This will never get called.")
}
responseTwo.onFailure {
case e: Exception => println("This will get called.")
}

In this example, we are creating a very simple actor just to give us a response. In this case, we're dictating what type of response we get based on the type of message we send. We're calling the actor twice and registering a single onComplete callback on the first response and both an onSuccess and onFailure callback on the second response.

As you can see again, each of the callbacks expect a PartialFunction to be passed to them. The onComplete callback will always receive one of Success or Failure. In a similarly rigid manner, onFailure will always receive some type of Throwable, typically some Exception object. onSuccess can receive pretty much anything -- that's defined by your code or the code of the actor you're interacting with.

All these callbacks have a return type of Unit. This is important since it means you can't chain the callbacks together. This isn't really a limitation, really, and should help you to remember that they are not going to be called in any predetermined order. 

I should also mention that, in the rare case that you need to wait for a response, you can also perform a blocking operation to get the result from a future. Really, you should likely only encounter this when you're using futures alongside synchronous code. Often there will be a better solution, though, so keep an eye out for alternative options. Having warned you about this, here's how you can wait on a response from a future. You still must provide a timeout so it won't block forever:

 import scala.concurrent._
import scala.concurrent.duration._

val responseFuture = future { longRunningOperation() }
val response = Await.result(responseFuture, 5 seconds)

If the future returns a failure either through its own logic or from a timeout, the code calling Await.result will instead receive an Exception, so be prepared for this possibilty if you need to handle those errors.

Wrap-up

This has been a bit of a whirlwind tour of the use of futures in both Scala and Akka, but it's important foundation material for you to understand as you start making use of these libraries. The fundamental nature of asynchronous computation is that some results will not be known until some future time, so having a general but usable mechanism for dealing with this is important.

 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.73.175