The Async library – SIP-22-Async

In Chapter 7, Working with Integration and Web Services, we have briefly seen how to call asynchronous web services that return a Future object. The aim of Async is to simplify asynchronous code by providing a couple of powerful constructs to deal with asynchronous code blocks and, in particular, combining several such blocks. It consists of only two constructs:

  • async { <expression> }: In this construct, <expression> is the code to be executed asynchronously.
  • await { <expression returning a Future> }: This construct is included in an async block. It suspends the execution of the enclosing async block until the argument Future is completed.

An interesting characteristic of the whole async/await mechanism is that it is totally nonblocking. Although it is not really required to understand how async/await works, the exact signature of the two methods async[] and await[] are given for reference, as follows:

def async[T](body: => T) : Future[T]
def await[T](future:Future[T]):T

T refers to arbitrary types (such as Int or String) or container types (such as List or Map), which is how we describe generic types in Scala. Although we will not cover too much programming with generic types, which has already been extensively described in other books such as Programming in Scala, Artima by Martin Odersky, Lex Spoon and Bill Venners, it is important to understand that they exist and they form part of the core of the Scala language.

To better understand what Async is all about, we will use the examples that we can run in the REPL. Create a new Play project by running the command > play new ch8samples and choose, of course, Scala as the language used for the project. Once the project is created, add the Async library as a dependency by adding one line inside the build.sbt file, which now looks like the following lines:

name := "ch8samples"

version := "1.0-SNAPSHOT"

libraryDependencies ++= Seq(
  jdbc,
  anorm,
  cache,
  "org.scala-lang.modules" %% "scala-async" % "0.9.0"
)     

play.Project.playScalaSettings

We can run the REPL console, as usual, in a terminal window by entering the following command from the root directory of the project:

> play console

First, we need to perform some imports, which are as shown in the following command lines:

scala> import scala.async.Async.{async, await}
import scala.async.Async.{async, await}
scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

Similarly, for a thread pool, an execution context is needed to handle how and when the asynchronous computation should be executed.

Second, we can specify an asynchronous computation by enclosing the computation into an async block:

scala> val computation = async { 3 * 2 }
computation: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@545c484c
scala> computation.value
res0: Option[scala.util.Try[Int]] = Some(Success(6))

As you can see that the type of the result is Option[scala.util.Try[Int]], recollect the brief discussion on the Try class in Chapter 2, Code Integration. We learned that it builds upon an Either class that can take the value Success or Failure that corresponds respectively to the Left and Right values of the Either class.

In our case, the computation was quite immediate and resulted in the success value of 6.

Let us make the computation that takes a longer time (for example, 10 seconds), as shown in the following command lines:

scala> val longComputation = async { Thread.sleep(10000); 3*2 }
longComputation: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@7b5ab834

Also, during those 10 seconds, we access its result value:

scala> longComputation.value
res1: Option[scala.util.Try[Int]] = None

We will get the answer None, which is what we expect, as the computation is not completed yet.

If we wait for 10 seconds and perform the same query again, we'll get our result:

scala> longComputation.value
res2: Option[scala.util.Try[Int]] = Some(Success(6))

Note that once a Future is completed and given a value, it cannot be modified.

An alternative to polling for the result is to be informed or execute some code when the Future is completed. We can do that by invoking the onComplete method, immediately after rerunning our computation, as follows:

scala> val longComputation = async { Thread.sleep(10000); 3*2 }
longComputation: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@1c6b985a
scala> longComputation.onComplete(println)
scala>   (no immediate result)

In other words, while the computation is not finished, we can proceed executing other statements:

scala> val hello = "Hello"

Eventually, we will see the value 6 on the screen, once the time of 10 seconds elapses:

scala> Success(6)

So far, we've seen that the async method performs the same way as the future method, which is part of the scala.concurrent package; for this reason, we could just replace async with future.

The preferred way is to use async in conjunction with await. The await method is taking a Future object as an input argument. It wraps the rest of the async, blocks in a closure and passes it as a callback on completion of the Future object we're waiting on (the one we passed as argument). Although await will wait for the invoked Future object until it is completed, the whole async/await execution is nonblocking, which means we can compose the Future objects in a totally nonblocking way.

Let's illustrate composing two computations where the input of one depends on the output of the other. A typical example is the invocation of two web services to query a weather forecast service: one that returns our current geo location and the other that needs our position (coordinates or the city name). The following lines of command explain the invocation:

scala> import play.api.libs.json._
import play.api.libs.json._
scala> import play.api.libs.ws._
import play.api.libs.ws._
scala> import play.api.libs.functional.syntax._
import play.api.libs.functional.syntax._
scala> import scala.util.{Success, Failure}
import scala.util.{Success, Failure}
scala> val locationURL = "http://freegeoip.net/xml/www.aftonbladet.se"
locationURL: String = http://freegeoip.net/xml/www.aftonbladet.se
scala> val futureLocation = WS.url(locationURL).get().map { response => 
         (response.xml  "City").text
       }
futureLocation: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@6039c183

Wait for a couple of seconds to make sure that the web service Future gets completed, then press Enter; you'll see the following result:

scala> val location = futureLocation.value
location: Option[scala.util.Try[String]] = Some(Success(Stockholm))

The first service returns the XML text where we extracted only the City element.

Now, let's try a second service from the http://openweathermap.org website, a useful resource for testing web service code in general. The following web service call returns the weather as a JSON message, given a particular location (we will use a hardcoded Paris city here to first experiment with this service alone without composing the two services):

scala> val weatherURL = "http://api.openweathermap.org/data/2.5/weather?q="
weatherURL: String = http://api.openweathermap.org/data/2.5/weather?q=
scala> val futureWeather = WS.url(weatherURL+"Paris").get().map{ response => 
         response.json
       }
futureWeather: scala.concurrent.Future[play.api.libs.json.JsValue] = scala.concurrent.impl.Promise$DefaultPromise@4dd5dc9f

Wait for a couple of seconds to make sure that the web service Future gets completed, then enter the following statement:

scala> val weather = futureWeather.value
weather: Option[scala.util.Try[play.api.libs.json.JsValue]] = Some(Success({"coord":{"lon":2.35,"lat":48.85},"sys":{"message":0.0052,"country":"FR","sunrise":1389166933,"sunset":1389197566},"weather":[{"id":803,"main":"Clouds","description":"broken clouds","icon":"04n"}],"base":"cmc stations","main":{"temp":284.36,"pressure":1013,"temp_min":284.15,"temp_max":284.82,"humidity":86},"wind":{"speed":5.37,"deg":193},"clouds":{"all":80},"dt":1389221871,"id":2988507,"name":"Paris","cod":200}))

Combining web services

We are now ready to combine two services using async/await.

Let's copy and paste the following lines at once in the REPL. To do this, we can use the convenient :paste command of the REPL, as shown in the following command lines:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val futureLocation = 
  WS.url(locationURL).get().map(resp => (resp.xml  "City").text)
val futureWeather2 = async {
  await(WS.url(weatherURL+await(futureLocation)).get()).body 
}
futureWeather2.onComplete(println)

// once the block is copied from somewhere using ctrl-C/ctrl-D, press ctrl-D

// Exiting paste mode, now interpreting.
futureLocation: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@1e111066
futureWeather2: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@724ba7f5
scala> Success({"coord":{"lon":18.06,"lat":59.33},"sys":{"message":0.0251,"country":"SE","sunrise":1395808197,"sunset":1395854197},"weather":[{"id":800,"main":"Clear","description":"Sky is Clear","icon":"01d"}],"base":"cmc stations","main":{"temp":277.29,"pressure":1028,"humidity":69,"temp_min":276.15,"temp_max":278.15},"wind":{"speed":5.1,"deg":60},"rain":{"3h":0},"clouds":{"all":0},"dt":1395852600,"id":2673730,"name":"Stockholm","cod":200})

What happens in this code is that the await construct ensures that the location city will be available to the weather service.

Combining services without await

If we do not put an await method around the futureLocation web service call while defining the futureWeather2 variable, we get a different answer. This is because, in such a case, the Future object that contains the location service answer is not yet populated when querying for the weather service. You can verify this behavior by copying and pasting the three following statements at once into the REPL (assuming the locationURL variable is still valid, it was created earlier while introducing the location service):

scala> :paste
// Entering paste mode (ctrl-D to finish)

val futureLocation = 
  WS.url(locationURL).get().map(resp => (resp.xml  "City").text)
val futureWeather2 = async {
  await(WS.url(weatherURL + futureLocation).get()).body 
}
futureWeather2.onComplete(println)

// once the block is copied from somewhere using ctrl-C/ctrl-D, press ctrl-D
// Exiting paste mode, now interpreting.
futureLocation: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@705a7c28
futureWeather2: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@448d5fb8
scala> Success({"message":"Error: Not found city","cod":"404"}
)

This time, the output shows that the city was not entered correctly into the weather service.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.27.72