Chapter 16. Futures

Topics in This Chapter A2

Writing concurrent applications that work correctly and with high performance is very challenging. The traditional approach, in which concurrent tasks have side effects that mutate shared data, is tedious and error-prone. Scala encourages you to think of a computation in a functional way. A computation yields a value, sometime in the future. As long as the computations don’t have side effects, you can let them run concurrently and combine the results when they become available. In this chapter, you will see how to use the Future and Promise traits to organize such computations.

The key points of this chapter are:

  • A block of code wrapped in a Future { ... } executes concurrently.

  • A future succeeds with a result or fails with an exception.

  • You can wait for a future to complete, but you don’t usually want to.

  • You can use callbacks to get notified when a future completes, but that gets tedious when chaining callbacks.

  • Use methods such as map/flatMap, or the equivalent for expressions, to compose futures.

  • A promise has a future whose value can be set once.

  • Pick an execution context that is suitable for the concurrent workload of your computation.

16.1 Running Tasks in the Future

The scala.concurrent.Future object can execute a block of code “in the future.”

import java.time.*
import scala.concurrent.*
given ExecutionContext = ExecutionContext.global

Future {
  Thread.sleep(10000)
  println(s"This is the future at ${LocalTime.now}")
}
println(s"This is the present at ${LocalTime.now}")

When running this code in the REPL, a line similar to the following is printed:

This is the present at 13:01:19.400

About ten seconds later, a second line appears:

This is the future at 13:01:29.140

When you create a Future, its code is run on some thread. One could of course create a new thread for each task, but thread creation is not free. It is better to keep some pre-created threads around and use them to execute tasks as needed. A data structure that assigns tasks to threads is usually called a thread pool. In Java, the Executor interface describes such a data structure. Scala uses the ExecutionContext trait instead.

Each Future must be constructed with a reference to an ExecutionContext. The simplest way is to use this statement, which uses syntax from Chapter 19:

given ExecutionContext = ExecutionContext.global

Then the tasks execute on a global thread pool. This is fine for demos, but in a real program, you should make another choice if your tasks block. See Section 16.9, “Execution Contexts,” on page 258 for more information.

Images CAUTION

The global execution context runs tasks on daemon threads. The Java virtual machine terminates when only daemon threads are running. In demo code, simply add a call to Thread.sleep at the end of the main method to keep the program running until all tasks are finished.

When you construct multiple futures, they can execute concurrently. For example, try running

Future { for i <- 1 to 100 do { print("A"); Thread.sleep(10) } }
Future { for i <- 1 to 100 do { print("B"); Thread.sleep(10) } }

You will get an output that looks somewhat like

ABABABABABABABABABABABABABABA...AABABBBABABABABABABABABBBBBBBBBBBBBBBBB

A future can—and normally will—have a result:

val f = Future {
  Thread.sleep(10000)
  42
}

When you evaluate f in the REPL immediately after the definition, you will get this output:

res12: scala.concurrent.Future[Int] = Future(<not completed>)

Wait ten seconds and evaluate f again:

res13: scala.concurrent.Future[Int] = Future(Success(42))

Alternatively, something bad may happen in the future:

val f2 = Future {
  if LocalTime.now.getMinute != 42 then
    throw Exception("not a good time")
  42
}

Unless the minute happens to be 42, the task terminates with an exception. In the REPL, you will see

res14: scala.concurrent.Future[Int] =
  Future(Failure(java.lang.Exception: not a good time))

Now you know what a Future is. It is an object that will give you a result (or failure) at some point in the future. In the next section, you will see one way of harvesting the result of a Future.

Images CAUTION

Do not take the Future(Success(42)) and Future(Failure(...)) outputs literally. These are just how the toString method formats a future that has completed. If you type Future(Success(42) into the REPL, you get an instance of Future[Success[Int]], which, when completed, is printed as Future(Success(Success(42))).

Images Note

The java.util.concurrent package has a Future interface that is much more limited than the Scala Future trait. A Scala future is equivalent to the CompletionStage interface in Java.

Images Tip

The Scala language imposes no restrictions on what you can do in concurrent tasks. However, you should stay away from computations with side effects. It is best if you don’t increment shared counters—even atomic ones. Don’t populate shared maps—even threadsafe ones. Instead, have each future compute a value. Then you can combine the computed values after all contributing futures have completed. That way, each value is only owned by one task at a time, and it is easy to reason about the correctness of the computation.

16.2 Waiting for Results

When you have a Future, you can use the isCompleted method to check whether it is completed. But of course you don’t want to wait for completion in a loop.

You can make a blocking call that waits for the result.

import scala.concurrent.duration.*
val f = Future { Thread.sleep(10000); 42 }
val result = Await.result(f, 10.seconds)

The call to Await.result blocks for ten seconds and then yields the result of the future.

The second parameter of the Await.result method has type Duration. Importing scala.concurrent.duration.* enables conversion methods from integers to Duration objects, called seconds, millis, and so on.

If the task is not ready by the allotted time, the Await.ready method throws a TimeoutException.

If the task throws an exception, it is rethrown in the call to Await.result. To avoid exceptions, you can call Await.ready and then get the result.

val f2 = Future {
  Thread.sleep((10000 * scala.math.random()).toLong)
  if scala.math.random() < 0.5 then throw Exception("Not your lucky day")
  42
}
val result2 = Await.ready(f2, 5.seconds).value

The Await.ready method yields its first argument, and the value method of the Future class returns an Option[Try[T]]. It is None when the future is not completed and Some(t) when it is is. Here, t is an object of the Try class, which holds either the result or the exception that caused the task to fail. You will see how to look inside it in the next section.

Images Note

In practice, you won’t use the Await.result or Await.ready methods much. You run tasks concurrently when they are time-consuming and your program can do something more useful than waiting for the result. Section 16.4, “Callbacks,” on page 250 shows you how you can harvest the results without blocking.

Images CAUTION

In this section, we used the result and ready methods of the Await object. The Future trait also has result and ready methods, but you should not call them. If the execution context uses a small number of threads (which is the case for the default fork-join pool), you don’t want them all to block. Unlike the Future methods, the Await methods notify the execution context so that it can adjust the pooled threads.

Images Note

Not all exceptions that occur during execution of the future are stored in the result. Virtual machine errors and the InterruptedException are allowed to propagate in the usual way.

16.3 The Try Class

A Try[T] instance is either a Success(v), where v is a value of type T or a Failure(ex), where ex is a Throwable. One way of processing it is with a match statement.

t match
  case Success(v) => println(s"The answer is $v")
  case Failure(ex) => println(ex.getMessage)

Alternatively, you can use the isSuccess or isFailure methods to find out whether the Try object represents success or failure. In the case of success, you can obtain the value with the get method:

if t.isSuccess then println(s"The answer is ${t.get}")

To get the exception in case of failure, first apply the failed method which turns the failed Try[T] object into a Try[Throwable] wrapping the exception. Then call get to get the exception object.

if t.isFailure then println(t.failed.get.getMessage)

You can also turn a Try object into an Option with the toOption method if you want to pass it on to a method that expects an option. This turns Success into Some and Failure into None.

To construct a Try object, call Try(block) with some block of code. For example,

val t = Try(str.toInt)

is either a Success object with the parsed integer, or a Failure wrapping a NumberFormatException.

There are several methods for composing and transforming Try objects. However, analogous methods exist for futures, where they are more commonly used. You will see how to work with multiple futures in Section 16.5, “Composing Future Tasks,” on page 251. At the end of that section, you will see how those techniques apply to Try objects.

16.4 Callbacks

As already mentioned, one does not usually use a blocking wait to get the result of a future. For better performance, the future should report its result to a callback function upon completion.

This is easy to arrange with the onComplete method.

f.onComplete(t => ...)

When the future has completed, either successfully or with a failure, it calls the given function with a Try object.

You can then react to the success or failure, for example by passing a match function to the onComplete method.

val f = Future {
  Thread.sleep(1000)
  if scala.math.random() < 0.5 then throw Exception("Not your lucky day")
  42
}
f.onComplete {
  case Success(v) => println(s"The answer is $v")
  case Failure(ex) => println(ex.getMessage)
}

By using a callback, we avoid blocking. Unfortunately, we now have another problem. In all likelihood, the long computation in one Future task will be followed by another computation, and another. It is possible to nest callbacks within callbacks, but it is profoundly unpleasant. (This technique is sometimes called “callback hell.”)

A better approach is to think of futures as entities that can be composed, similar to functions. You compose two functions by calling the first one, then passing its result to the second one. In the next section, you will see how to do the same with futures.

16.5 Composing Future Tasks

Suppose we need to get some information from two web services and then combine the two. Each task is long-running and should be executed in a Future. It is possible to link them together with callbacks:

val future1 = Future { getData1() }
val future2 = Future { getData2() }

future1 onComplete {
  case Success(n1) =>
    future2 onComplete {
      case Success(n2) => {
        val n = n1 + n2
        println(s"Sum: $n")
      }
      case Failure(ex) => ex.printStackTrace()
    }
  case Failure(ex) => ex.printStackTrace()
}

Even though the callbacks are ordered sequentially, the tasks run concurrently. Each task starts after the Future.apply method executes or soon afterwards. We don’t know which of future1 and future2 completes first, and it doesn’t matter. We can’t process the result until both tasks complete. Once future1 completes, its completion handler registers a completion handler on future2. If future2 has already completed, the second handler is called right away. Otherwise, it is called when future2 finally completes.

Even though this nesting of the callbacks works, it looks very messy, and it will look worse with each additional level of processing.

Instead of nesting callbacks, we will use an approach that you already know from working with Scala collections. Think of a Future as a collection with (hopefully, eventually) one element. You know how to transform the values of a collection—with map:

val future1 = Future { getData1() }
val combined = future1.map(n1 => n1 + getData2())

Here future1 is a Future[Int]—a collection of (hopefully, eventually) one value. We map a function Int => Int and get another Future[Int]—a collection of (hopefully, eventually) one integer.

But wait—that’s not quite the same as in the callback code. The call to getData2 is running after getData1, not concurrently. Let’s fix that with a second map:

val future1 = Future { getData1() }
val future2 = Future { getData2() }
val combined = future1.map(n1 => future2.map(n2 => n1 + n2))

When future1 and future2 have delivered their results, the sum is computed.

Unfortunately, now combined is a Future[Future[Int]], which isn’t so good. That’s what flatMap is for:

val combined = future1.flatMap(n1 => future2.map(n2 => n1 + n2))

This looks much nicer when you use a for expression instead of chaining flatMap and map:

val combined = for n1 <- future1; n2 <- future2 yield n1 + n2

This is exactly the same code since for expressions are translated to chains of map and flatMap.

What if something goes wrong? The map and flatMap implementations take care of all that. As soon as one of the tasks fails, the entire pipeline fails, and the exception is captured. In contrast, when you manually combine callbacks, you have to deal with failure at every step.

You can also apply guards in the for expression:

val combined =
  for n1 <- future1; n2 <- future2 if n1 != n2 yield n1 + n2

If the guard fails, the computation fails with a NoSuchElementException.

So far, you have seen how to run two tasks concurrently. Sometimes, you need one task to run after another. A Future starts execution immediately when it is created. To delay the creation, use functions.

val future1 = Future { getData1() }
def future2 = Future { getData2() } // def, not val
val combined = for n1 <- future1; n2 <- future2 yield n1 + n2

Now future2 is only evaluated when future1 has completed.

It doesn’t matter whether you use val or def for future1. If you use def, its creation is slightly delayed to the start of the for expression.

This is particularly useful if the second step depends on the output of the first:

def future1 = Future { getData() }
def future2(arg: Int) = Future { getMoreData(arg) }
val combined = for n1 <- readInt("n1"); n2 <- readInt("n2") yield n1 + n2

Images Note

Like the Future trait, the Try class from Section 16.3, “The Try Class,” on page 249 has map and flatMap methods. A Try[T] is a collection of, hopefully, one element. It is just like a Future[T], except you don’t have to wait. You can apply map with a function that changes that one element, or flatMap if you have Try-valued function and want to flatten the result. And you can use for expressions. For example, here is how to compute the sum of two function calls that might fail:

def readInt(prompt: String) = Try(StdIn.readLine(s"$prompt: ").toInt)
val combined =
  for n1 <- readInt("n1"); n2 <- readInt("n2") yield n1 + n2

In this way, you can compose Try-valued computations and you don’t need to deal with the boring part of error handling.

16.6 Other Future Transformations

The map and flatMap methods that you saw in the preceding section are the most fundamental transformation of Future objects.

Table 16–1 shows several ways of applying functions to the contents of a future that differ in subtle details.

The foreach method works exactly like it does for collections, applying a method for its side effect. The method is applied to the single value in the future. It is convenient for harvesting the answer when it materializes.

val combined = for n1 <- future1; n2 <- future2 yield n1 + n2
combined.foreach(n => println(s"Sum: $n"))

Table 16–1 Transformations on a Future[T] with Success Value v or Exception ex

Method

Result type

Description

collect(pf: PartialFunction[T, S])

Future[S]

Like map, but with a partial function. The result fails with a NoSuchElementException if pf(v) is not defined.

foreach(f: T => U)

Unit

Calls f(v) like map, but only for its side effect.

andThen(pf: PartialFunction[Try[T], U])

Future[T]

Calls pf(v) for its side effect and returns a future with v.

filter(p: T => Boolean)

Future[T]

Calls p(v) and returns a future with v or a NoSuchElementException.

recover(pf: PartialFunction[Throwable, U])
recoverWith(pf: PartialFunction[Throwable, Future[U]])

Future[U] (where U is a supertype of T)

A future with value v or pf(ex), flattened in the asynchronous case.

fallbackTo(f2: Future[U])

Future[U] (where U is a supertype of T)

A future with value v, or if this future failed, with the value of f2, or if that also failed, with exception ex.

failed

Future[Throwable]

A future with value ex.

transform(s: T => S, f: Throwable => Throwable)
transform(f: Try[T] => Try[S])
transformWith(f: Try[T] => Future[Try[S]])

Future[S]

Transforms both the success and failure.

zip(f2: Future[U])

Future[(T, U)]

A future with a pair holding v and the value of f2, or ex if this future fails, or the failure of f2.

zipWith(f2: Future[U])(f: (T, U) => R)

Future[R]

Zips both futures and applies f.

flatten

Future[S] (where T is Future[S])

Flattens a Future[Future[S]] into a Future[S].

The recover method accepts a partial function that can turn an exception into a successful result. Consider this call:

val f = Future { persist(data) } recover { case e: SQLException => 0 }

If a SQLException occurs, the future succeeds with result 0.

The fallbackTo method provides a different recovery mechanism. When you call f.fallbackTo(f2), then f2 is executed if f fails, and its value becomes the value of the future. However, f2 cannot inspect the reason for the failure.

The failed method turns a failed Future[T] into a successful Future[Throwable], just like the Try.failed method. You can retrieve the failure in a for expression like this:

val f = Future { persist(data) }
for v <- f do println(s"Succeeded with $v")
for ex <- f.failed do println(s"Failed with $ex")

Finally, you can zip two futures together. The call f1.zip(f2) yields a future whose result is a pair (v, w) if v was the result of f1 and w the result of f2, or an exception if either f1 or f2 failed. (If both fail, the exception of f1 is reported.)

The zipWith method is similar, but it takes a function to combine the two results instead of returning a pair. For example, here is another way of obtaining the sum of two computations:

val future1 = Future { getData1() }
val future2 = Future { getData2() }
val combined = future1.zipWith(future2)(_ + _)

16.7 Methods in the Future Object

The Future companion object contains useful methods for working on collections of futures.

Suppose that, as you are computing a result, you organize the work so that you can concurrently work on different parts. For example, each part might be a subsequence of the inputs. Make a future for each part:

val futures = parts.map(part => Future { process(part) })

Now you have a collection of futures. Often, you want to combine the results. By using the Future.sequence method, you can get a collection of all results for further processing:

val result = Future.sequence(futures);

Note that the call doesn’t block—it gives you a future to a collection. For example, assume futures is a Seq[Future[T]]. Then the result is a Future[Seq[T]]. When the results for all elements of futures are available, the result future will complete with a sequence of the results.

If any of the futures fail, then the resulting future fails as well with the exception of the first failed future. If multiple futures fail, you don’t get to see the remaining failures.

The traverse method combines the map and sequence steps. Instead of

val futures = parts.map(p => Future { process(p) })
val result = Future.sequence(futures);

you can call

val result = Future.traverse(parts)(p => Future { process(p) })

The function in the second curried parameter is applied to each element of parts. You get a future to a collection of all results.

There are reduceLeft and foldLeft operations on iterables of futures. You supply an operation that combines the results of all futures as they become available. For example, here is how you can compute the sum of the results:

val result = Future.reduceLeft(futures)(_ + _)
  // Yields a future to the sum of the results of all futures

So far, we have collected the results from all futures. Suppose you are willing to accept a result from any of the parts. Then call

val result = Future.firstCompletedOf(futures)

You get a future that, when it completes, has the result or failure of the first completed element of futures.

The find method produces the leftmost result matching a predicate.

val result = Future.find(futures)(predicate)
  // Yields a Future[Option[T]]

You get a future that, when it completes successfully, yields Some(r), where r is the result of one of the given futures that fulfills the predicate. Failed futures are ignored. If all futures complete but none yields a result that matches the predicate, then find returns None. Note that the predicate parameter has type Option[T].

Images CAUTION

A potential problem with firstCompletedOf and find is that the other computations keep on going even when the result has been determined. Scala futures do not have a mechanism for cancellation.

The Future.delegate method runs a Future-producing function and flattens the result:

def future1 = Future { getData() } // Note def, not val
val result = Future.delegate(future1) // A Future[T], not a Future[Future[T]]

Finally, the Future object provides convenience methods for generating simple futures:

  • Future.successful(r) is an already completed future with result r.

  • Future.failed(e) is an already completed future with exception e.

  • Future.fromTry(t) is an already completed future with the result or exception given in the Try object t.

  • Future.unit is an already completed future with Unit result.

  • Future.never is a future that never completes.

16.8 Promises

A Future object is read-only. The result of the future is set implicitly when its task has completed or failed. It cannot be set explicitly.

As a consumer of a Future, you would never want to set the result. The point of a Future is to process a result once it is ready.

However, if you produce a Future for others to consume, the task mechanism only works for synchronous computations. If you use an asynchronous API, you are called back when the result is available. That’s the point where you want to set the result and complete the Future. You use a Promise to make that work.

Calling success on a promise sets the result. Alternatively, you can call failure with an exception to make the promise fail. As soon as one of these methods is called, the associated future is completed, and neither method can be called again. (An IllegalStateException is thrown otherwise.)

Here is a typical workflow:

def computeAnswer(arg: String) = {
  val p = Promise[String]()
  def onSuccess(result: String) = p.success(result)
  def onFailure(ex: Throwable) = p.failure(ex)
  startAsyncWork(arg, onSuccess, onFailure)
  p.future
}

Calling future on a promise yields the associated Future object. Note that the method returns the Future right away, immediately after starting the work that will eventually yield the result.

From the point of view of the consumer (that is, the caller of the computeAnswer method), there is no difference between a Future that was constructed with a task function and one that was produced from a Promise. Either way, the consumer gets the result when it is ready.

The producer, however, has more flexibility when using a Promise. For example, multiple tasks can work concurrently to fulfill a single promise. When one of the tasks has a result, it calls trySuccess on the promise. Unlike the success method, that method accepts the result and returns true if the promise has not yet completed; otherwise it returns false and ignores the result.

val p = Promise[String]()
Future {
  val result = workHard(arg)
  p.trySuccess(result)
}
Future {
  val result = workSmart(arg)
  p.trySuccess(result)
}

The promise is completed by the first task that manages to produce the result. With this approach, the tasks might want to periodically call p.isCompleted to check whether they should continue.

Images Note

Scala promises are very similar to the CompletableFuture class in Java 8.

16.9 Execution Contexts

The global execution context executes futures on the global fork-join pool. That works well for computationally intensive tasks. However, the fork-join pool only manages a small number of threads (by default, equal to the number of cores of all processors). This is a problem when tasks have to wait, for example when communicating with a remote resource. A program could exhaust all available threads, waiting for results.

You can notify the execution context that you are about to block, by placing the blocking code inside blocking { ... }:

val f = Future {
  val url = "https://horstmann.com/index.html"
  blocking {
    val contents = Source.fromURL(url).mkString
    if contents.length < 300
    then contents
    else contents.substring(0, 300) + "..."
  }
}

The execution context may then increase the number of threads. The fork-join pool does exactly that, but it isn’t designed to perform well for many blocking threads. If you do input/output or connect to databases, you are better off using a different thread pool. The Executors class from the Java concurrency library gives you several choices. A cached thread pool works well for I/O intensive workloads. You can pass it explicitly to the Future.apply method, or you can set it as the given execution context:

val pool = Executors.newCachedThreadPool()
given ExecutionContext = ExecutionContext.fromExecutor(pool)

Now this pool is used by all futures where the given declaration is in scope. (See Chapter 19 for more information about given declarations.)

Exercises

1. Consider the expression

for
  n1 <- Future { Thread.sleep(1000) ; 2 }
  n2 <- Future { Thread.sleep(1000); 40 }
do
  println(n1 + n2)

How is the expression translated to map and flatMap calls? Are the two futures executed concurrently or one after the other? In which thread does the call to println occur?

2. Write a function doInOrder that, given two functions f: T => Future[U] and g: U => Future[V], produces a function T => Future[V] that, for a given t, eventually yields g(f(t)).

3. Repeat the preceding exercise for any sequence of functions of type T => Future[T].

4. Write a function doTogether that, given two functions f: T => Future[U] and g: T => Future[V], produces a function T => Future[(U, V)], running the two computations in parallel and, for a given t, eventually yielding (f(t), g(t)).

5. Repeat the preceding exercise for any sequence of functions of type T => Future[U].

6. Write a function

repeat(action: => T, until: T => Boolean): Future[T]

that asynchronously repeats the action until it produces a value that is accepted by the until predicate, which should also run asynchronously. Test with a function that reads a password from the console, and a function that simulates a validity check by sleeping for a second and then checking that the password is "secret". Hint: Use recursion.

7. Write a program that counts the prime numbers between 1 and n, as reported by BigInt.isProbablePrime. Divide the interval into p parts, where p is the number of available processors. Count the primes in each part in concurrent futures and combine the results.

8. Write a program that asks the user for a URL, reads the web page at that URL, and displays all the hyperlinks. Provide functions that yield futures for each of these three steps, and then invoke the functions in a for comprehension.

9. Write a program that asks the user for a URL, reads the web page at that URL, finds all the hyperlinks, visits each of them concurrently, and locates the Server HTTP header for each of them. Finally, print a table of which servers were found how often. The futures that visit each page should return the header.

10. Change the preceding exercise where the futures that visit each header update a shared Java ConcurrentHashMap or Scala TrieMap. This isn’t as easy as it sounds. A threadsafe data structure is safe in the sense that you cannot corrupt its implementation, but you have to make sure that sequences of reads and updates are atomic.

11. In the preceding exercise, you updated a mutable Map[URL, String]. Consider what happens when two threads concurrently query for the same key whose result is not yet present. Then both threads expend effort computing the same value. Avoid this problem by using a Map[URL, Future[String]] instead.

12. Using futures, run four tasks that each sleep for ten seconds and then print the current time. If you have a reasonably modern computer, it is very likely that it reports four available processors to the JVM, and the futures should all complete at around the same time. Now repeat with forty tasks. What happens? Why? Replace the execution context with a cached thread pool. What happens now? (Be careful to define the futures after declaring the given execution context.)

13. Using Swing or JavaFX, implement a function that returns a future for a button click. Use a promise to set the value to the button label when the button is clicked. Fail the promise when a timeout has expired.

14. Write a method that, given a URL, locates all hyperlinks, makes a promise for each of them, starts a task in which it will eventually fulfill all promises, and returns a sequence of futures for the promises. Why would it not be a good idea to return a sequence of promises?

15. Use a promise for implementing cancellation. Given a range of big integers, split the range into subranges that you concurrently search for palindromic primes. When such a prime is found, set it as the value of the future. All tasks should periodically check whether the promise is completed, in which case they should terminate.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.166.76