Chapter 12. Asynchronous programming

This chapter covers

  • Scala, servlets, and mutable state
  • Using Futures for asynchronous programming
  • Using Akka Actors with Scalatra
  • Big data in Scalatra with Spark

Scala makes multicore, distributed, and asynchronous programming easier. Its focus on immutability, as well as associated libraries such as Akka, lower the conceptual burdens of concurrency management by making it easier to reason about your code. Why is this important?

Most other languages still rely on old-style concurrency management tools like locks and threads, which can be extremely difficult to use because they are non-deterministic. That is, you can’t necessarily reproduce your threading bugs when you want, because they can be the result of multiple threads interacting in strange and horrifying ways.

Older concurrency models were designed in an age of non-networked, single-core microcomputers, when building distributed systems was the exception rather than the norm. We now live in a very different world. New servers today typically have 32 or 64 cores, and high-performance applications are almost always built to run across a network. Scala stands out from the language crowd as a great choice for this new hardware and networking environment. This is one of the reasons that it’s being used by big organizations that need to build highly scalable distributed systems.

How does Scalatra fit into all this?

Scalatra runs on good old-fashioned servlets. This means that in order to understand asynchronous operations in Scalatra, you need to take the servlet threading model into account.

12.1. Exploring concurrency in Scalatra

To illustrate your options, let’s build a simple web application. First, generate the project using g8:

g8 scalatra/scalatra-sbt
organization [com.example]: com.constructiveproof
name [My Scalatra Web App]: Crawler
version [0.1.0-SNAPSHOT]:
servlet_name [MyScalatraServlet]: CrawlController
package [com.example.app]: com.constructiveproof.crawler
scala_version [2.11.6]:
sbt_version [0.13.8]:
scalatra_version [2.4.0]:

With that done, let’s build out a simple single-threaded web client. Hitting the route "/" on the controller in the following listing will trigger a call to Grabber.evaluate. Whatever URL you drop into the url parameter will be retrieved and evaluated. If it’s a Scala site, you’ll be congratulated. If it’s not, you’ll be admonished to choose more carefully next time.

Listing 12.1. Synchronous network retrieval

You can easily try this code by hitting the URL http://localhost:8080/?url=http://scala-lang.org/. You should see a response like the one shown in figure 12.1.

Figure 12.1. A response from the Grabber

In a world awash with HTTP APIs and networked machines, making network calls like this, and doing one thing or another based on the response received, is the sort of thing we need to do constantly. Whether you’re dealing with upstream APIs, machine learning problems, or some other area of development, chances are you’ll need to write code like this pretty often. The problem is that the preceding code isn’t going to scale very well.

Servlet containers maintain a thread pool for dealing with incoming requests. By default, Apache Tomcat has a pool of 200 threads. When a request comes in, Tomcat uses a thread from the pool, and the thread stays tied up for the duration of the request.

In the case of the previous code, you’re making a call to the network, which is potentially slow. You’re tying up a thread for however long it takes the upstream server to respond. This call is synchronous. The CrawlController calls Grabber.evaluate and sits there waiting for the response.

The upstream server will probably come back within 1 second or so, but you’re using 0.5% of the available thread resources to service this one request. It’s wasteful, and it isn’t going to scale well, especially if you need to make multiple requests to upstream APIs in order to build the response. You’d be able to handle a lot more traffic if you handed execution of Grabber.evaluate off to another thread pool, returned the controller’s thread back to the servlet container while you waited for the long-running network operation to complete, and resumed execution once the upstream server had responded.

This kind of asynchronous processing is what Scala excels at. Scala has a few different constructs for dealing with asynchronous tasks, and we’ll look at two of them: Futures and Actors. Let’s start with Futures.

12.2. Using Futures in Scalatra

A Future is part of the Scala standard library. It represents a possibly still-running computation. Futures let you program with results you don’t have yet, and when the results become available, any transformations you’ve defined will run. You can attach callbacks to them—onSuccess, onFailure, onComplete—to take action whenever they’re done.

If you return a Future from your Scalatra action, Scalatra suspends the request when it hands off to the Future, freeing up a thread. When the Future is completed, Scalatra will wake up the servlet thread and send the response back to the client. This can dramatically increase your throughput compared with hanging on to a thread for the duration of the process.

Futures are easy to use in Scalatra. All you need to do is add FutureSupport to your controller class definition and define a thread pool for the Future to do its work in. Futures run in their own thread pool, separate from the servlet container’s thread pool. With these changes made, CrawlController looks as shown next.

Listing 12.2. CrawlController with FutureSupport

The code has hardly changed, but suddenly Grabber.evaluate is running inside its own thread pool, and the servlet container’s thread will suspend execution until the Grabber does its work. The servlet thread will resume afterwards. It’s asynchronous.

Note

We could easily use an asynchronous HTTP client, such as Apache’s HttpAsyncClient, or Scala’s Dispatch HTTP library, to decouple the outgoing HTTP calls from the servlet container’s thread pool. These would solve the problem very well, in fact. But by using Scala’s synchronous Source.fromURL function, we demonstrate the ways in which you can take any synchronous code and make it asynchronous, without needing any extra libraries. This is a much more flexible and general solution.

There are a few things to consider here. Adding FutureSupport means that you need to define an ExecutionContext for your Futures to run in. Adding implicit def executor = ExecutionContext.global is what accomplishes this. There are quite a few different kinds of ExecutionContexts that you can choose from, each with different qualities. If in doubt, use ExecutionContext.global. It uses a ForkJoinPool, which helps to minimize context switches and starts up the thread pool with a size equal to the number of processors. If you need more control over the behavior of your thread pool, you can instantiate a different one yourself. Examples include CachedThreadPool, FixedThreadPool, and WorkStealingThreadPool, each with their own trade-offs.

One thing to watch out for: never close a Future over mutable state. For instance, the servlet container makes the variable request available to you inside your Scalatra actions. The request is a reference to something that, by definition, lives inside the servlet container’s thread pool. This raises a conundrum: the request is in the servlet container, but everything inside the Future executes in a totally different thread pool. What happens if you attempt to access the request from inside the Future? The answer is simple and potentially unexpected: it will be null, because ExecutionContext.global doesn’t know anything about it.

This goes for Scala vars just as much as the servlet’s request object, and it’s a common pitfall when working with Scala async libraries. If you want to eliminate the problem in Scalatra, you can do so by wrapping your Future in some syntactic sludge: AsyncResult. At the cost of some extra boilerplate, this provides a stable identifier to the request object that’s in scope for the Future. You can then use the request inside your Futures in complete safety. The AsyncResult version looks like the next listing.

Listing 12.3. Using AsyncResult

AsyncResult expects that whatever method it calls will return a Future, so the method signature of Grabber.evaluate has changed a little. Instead of giving back a String, it now returns a Future[String].

Futures can be strange to work with. Instead of working with actual values, you work with a wrapper, which may or may not contain the value you expect at any given point in time. This is because you asynchronously wait for operations to complete, and you don’t necessarily control when they will complete. The key thing to remember is that you don’t need to have the value in order to define what should happen when it arrives.

Now that you’ve seen how to integrate Futures into Scalatra, the Akka documentation on Futures is a good thing to read next: http://doc.akka.io/docs/akka/2.3.4/scala/futures.html. Let’s turn our attention to another way of dealing with concurrency in Scalatra: Akka Actors.

12.3. Using Akka Actors from Scalatra

Akka is a Scala library that gives you access to a software construct called Actors. An Actor is a container for application logic that’s designed to send and receive messages. An Actor is lightweight—about 400 bytes before you start adding your domain logic—so you can have millions of them running at any given time. Like Futures, Akka Actors run in their own thread pool, which is detached from your Scalatra application’s servlet thread pool. Unlike Futures, they can run on either a single machine or across a cluster of machines. The Akka library does all the thread management and scheduling and takes care of inter-Actor communication. On the other hand, setting up a distributed Akka ActorSystem is a lot more complex than just firing off a Future—each approach has its place.

To add Akka to your application, you’ll need to add the following dependency to project/build.scala:

"com.typesafe.akka" %% "akka-actor" % "2.3.4",

Let’s add an Actor that serves the same function as your Grabber object already does. Drop the code from the following listing into your application, in the actors namespace.

Listing 12.4. Akka Actor for retrieving URLs

As you can see, this is really just the Grabber object expressed as an Actor. Let’s take a look at the component parts. There are two main differences between the GrabActor and the Grabber object.

First, the GrabActor has a receive method. All Actors must implement one of these: it’s the key to their concurrency properties. Actors are completely opaque to other software components in your system. They communicate with each other only by sending immutable messages to the receive method. This keeps them isolated from each other, and it means that they can run across more than one machine: the immutable messages can easily be serialized and sent to an Actor that exists remotely. The requirement that messages must be immutable (that you can use vals but not vars) means that all of the problems stemming from using locks to access shared memory go away. An Actor instance never shares any state with any other code, so it can safely be executed on any available thread. Getting rid of locks, in turn, means higher performance and eliminates the chance of deadlocks, race conditions, thread starvation, and many of the other problems that have plagued programmers for the last several decades.

Second, the receive method returns Unit, which is Scala’s way of saying it doesn’t return anything to the caller. Instead of returning a value directly from its receive method, GrabActor does things the Actor way. It knows what sent it the message, and it stores this as a special reference in the value sender. When it’s done evaluating the web page, it sends a message back to the sender using the syntax sender ! "It's a Scala site, very cool." The bang operator, !, is the tell operator. It means “Send a fire-and-forget message to whatever Actor reference is on the left side of me. The message to send is on the right side of me.”

Let’s integrate GrabActor with the rest of your Scalatra application. First, add a new controller, called AkkaCrawler.

Listing 12.5. Controller class integrated with an Akka Actor

Again, it looks almost exactly the same as the original CrawlController, which used bare Futures. There are a few differences, though. First, you pass an ActorSystem and a reference to GrabActor in to the controller’s constructor. You need to have the reference to the GrabActor in order to use it, which makes sense; but what’s the ActorSystem? It’s a set of Actors and a thread pool, which share a common configuration.

Second, you can see that although AsyncResult is still in use (just like with the Future example, earlier), you no longer invoke a method on an object in order to return a response. Instead, you send a message to the grabActor using the ? operator. The question mark operator, ?, is known as the ask pattern operator. It means “Ask whatever Actor reference is on the left side of me for a response to the message on the right side of me.” The Actor being asked sends back a response as a Future, using ! (the tell operator). If you take a look back at GrabActor, you’ll see that’s exactly what it’s doing.

The last main difference between the original CrawlController and this AkkaCrawler is the timeout. Akka requires that you explicitly set a timeout duration (in this case, 2 seconds) whenever you use the ask pattern. This forces you to think about how reactive you want your application to be. It also stops you from uselessly tying up resources with asks that will never complete. Any ask that exceeds its timeout will throw an exception. You should read the Akka documentation, which is extensive, or the book Akka in Action by Raymond Roestenburg, Rob Bakker, and Rob Williams (also from Manning) to get a feel for timeouts and exception handling with Akka.

Now that the controller is in place, you need to mount it in ScalatraBootstrap. Change the default ScalatraBootstrap so that it looks like the following listing.

Listing 12.6. Setting up for Akka in ScalatraBootstrap

ActorSystem creation is a relatively heavyweight operation, and you typically only want one of them in your web app. You’ll usually instantiate one when your application starts and then use it for all Akka-related purposes, as you’re doing here.

This code also mounts the AkkaCrawler so it’s available for web requests and passing references to the ActorSystem and GrabActor into the constructor.

Once you recompile your application and reload it, you can hit the URL http://localhost:8080/akka?url=http://akka.io/ and see that the GrabActor has asynchronously gone off to grab the Akka home page and evaluate it. Because it obviously has the word Akka in it, you’ll see the message from figure 12.2 displayed in your browser.

Figure 12.2. A message from your Akka GrabActor

Akka programming is something that entire books are written about. We’ve only scratched the surface here, but you’ve seen how easy it is to integrate Akka with a Scalatra web app.

When should you use a Future? When should you use an Actor? Futures execute on the same machine, whereas Actors can be on the same machine or they can run across a network. Futures can be easier to use, but Actors can encapsulate state very cleanly.

12.4. Using Scalatra for big data

Scalatra can easily be used to provide an HTTP interface to your big data jobs. Using Spark, a successor to Hadoop, you can easily query datasets even if they’re too big for comfortable processing using conventional tools. Spark is a big data framework that allows you to run batch jobs, query data interactively, and process incoming information as it streams into your system.

Spark runs on top of normal Hadoop data analysis infrastructure—if you already have a Hadoop Distributed File System (HDFS) cluster set up, you can run Spark on top of that, and run jobs on it without modifying or disturbing anything you’re already doing. Like Hadoop, Spark can do batch processing, although it’s typically quite a bit faster than Hadoop due to aggressive caching of data in RAM.

Hadoop workloads are usually batch jobs on large amounts of data, and Hadoop isn’t usually used for interactive querying. In contrast, Spark has the ability to do interactive queries with quick response times. It has the potential to fundamentally transform the way people are doing big data.

To see this in action, find yourself any downloadable large dataset. We went to gov.uk’s statistical datasets and grabbed the Price Paid data as a CSV file (http://mng.bz/bn9Y). It contains information about every house or apartment sold in the United Kingdom in the past 20 years. But the example analysis job will be non-specific enough that any big dataset should work fine.

We unzipped the downloaded zip file, moved it to a desktop, and renamed it data.csv. You should put yours in an easily accessible place; we’ll reference the filename directly in a few moments.[1]

1

Making the file location configurable rather than hardcoded would be the right thing to do, but it’d take us a little farther afield than we want right now.

Next, you need to get Spark imported into your Scalatra application. Add the following dependency into ScalatraBootstrap:

"org.apache.spark" %% "spark-core" % "1.3.1",

Next, make a controller so that you can access Spark. It should look like the following listing.

Listing 12.7. Example Spark controller

Intriguingly, there’s not much code here. In the constructor for SparkExample-Controller, you pass in a SparkContext object. Spark is based on Akka, and like an Akka ActorSystem, a SparkContext is essentially a thread pool providing access to a configuration of Akka Actors. These actors may be local to the machine you’re running on or may be a cluster of machines running remotely.

You define the route GET /count/:word, and because you expect this Spark job to be long-running, you run the WordCounter.count invocation inside a Future. The SparkContext gets passed into the WordCounter so that you can keep a reference to the thread pool where Spark is running. You then define a path to the data source (change this to suit where you put your data), and define it as a textFile for the SparkContext, sc. That takes care of the setup.

All the work is done in one line. Spark contains a set of distributed collection classes, which are very similar to the regular Scala collections but which can be run distributed inside a Spark cluster. The code lines.filter(line => line.contains(word)) .count() counts all occurrences of whatever word you’re interested in. If the SparkContext is running locally, this will happen on the local machine. If the SparkContext points at a cluster, the job will be run there (although you’ll need to do some additional configuration work to distribute your Spark job across the cluster).

Before you can see your job in action, you need to define a SparkContext and mount the new controller in ScalatraBootstrap. Change yours as follows.

Listing 12.8. ScalatraBootstrap with Spark

Creating a SparkContext is relatively easy. In its simplest form, it takes only two parameters: the address of a Spark cluster (or the word local), and a human-readable name. You then mount the SparkExampleController, passing the SparkContext to the controller’s constructor. Finally, you tell the SparkContext to shut down, using sc.stop() when Scalatra shuts down, by adding a destroy() method to ScalatraBootstrap.

You’re now ready to run the code. Exit sbt by typing exit, and rerun sbt—that way, the servlet container will create a SparkContext when it starts up. The results look like figure 12.3.

Figure 12.3. Running the Spark job and viewing its output

Impressively, Spark was able to rip through our 3.4 GB file in about 8 seconds. This would be considered too slow for your average web application, but for doing data analytics work, it’s quite acceptable in our view.

Running in local mode like this is great for demonstrating how easy Spark is to use, but it isn’t very useful if you’ve got multiple users accessing your application at the same time. When you’re running in local mode like this, there’s no concurrency available to you. Your Spark jobs are all submitted to the same SparkContext, and they’ll run in order rather than simultaneously. If you want to have concurrent access to multiple Spark jobs running at the same time, you’ll need to set up a Spark cluster, and let the cluster handle concurrency and resource-sharing.

If you’re running on a multicore system, it’s possible to use a trick to speed things up a bit. When you defined your SparkContext in ScalatraBootstrap, it looked like this:

val sc = new SparkContext("local", "Spark Demo")

When you define your SparkContext as "local", you’re telling Spark to use only one processor. But you likely have more than one available. If you set your SparkContext as "local[X]" where X is equal to the number of processors in your machine, your job will speed up. We’ve set ours like this:

val sc = new SparkContext("local[8]", "Spark Demo")

Doing this allows processing to speed up quite a bit: from 8 seconds on average to about 2.4 seconds. Pretty good!

Spark is itself written in Scala, so it takes advantage of the same kinds of technologies (Akka Actors and Futures) that we’ve discussed in this chapter. If you really want to see the performance benefits of Spark, try running it on a cluster—it’s designed to efficiently share the resources of dozens or hundreds of machines, and you’ll see massive performance increases from running it this way.

12.5. Summary

  • You can use Scala’s Futures to decouple work that Scalatra’s HTTP thread pool does from the main work in your application. Use Futures when you want a simple, lightweight concurrency construct.
  • Akka is more complex, but potentially much more powerful, than the Futures approach. Using Akka Actors gives you a very large number of independent concurrency units that can interoperate with each other. Because Akka can handle execution over a network, this gets your code running on more than one machine at the same time.
  • Using the Spark library, which is built on top of Akka, you can do parallel processing of very large datasets.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.241.250