© Kit Eason 2018
Kit EasonStylish F#https://doi.org/10.1007/978-1-4842-4000-7_10

10. Asynchronous and Parallel Programming

Kit Eason1 
(1)
Farnham, Surrey, UK
 

I know how hard it is to watch it go.

And all the effort that it took to get there in the first place.

And all the effort not to let the effort show.

—Everything but the Girl, Band

Ordering Pizza

In asynchronous programming, we embrace the fact that certain operations are best represented by running them separately from the main flow of logic. Instead of stopping everything while we wait for a result, we expect those separate computations to notify us when they have completed, at which point we’ll deal with their results. It’s a bit like one of those restaurants where you order, say, a pizza, and they give you a pager that flashes when your order is ready. You’re free to grab a table and chat with your friends. When the pager goes off, you collect your pizza and carry on with the main business of your visit – eating!

F# offers an elegant model for asynchronous computation, but it has to be said that working asynchronously inevitably complicates the business logic of your code. The trick is to keep the impacts to the minimum. Fortunately, by adopting a small set of coding patterns, you can keep your code elegant and readable, while still getting the benefits of asynchronous working.

A World Without Async

Because the benefits and impacts of asynchronous working tend to manifest across the whole structure of your program, I’m going to break with the practice of most of this book and offer a complete, potentially useful program as the example for this whole chapter. The example is a “bulk file downloader,” a console program that can find all the file download links in a web page and download all the files. It’ll also be able to filter what it downloads. For example, you could download just the files whose names end in “.gz”. As a starting point, I’ll offer a synchronous version of the program. Then I’ll go through all the steps necessary to make it work asynchronously. This reflects my normal coding practice: I tend to write a synchronous version initially to get all the business logic clear, then I translate relevant portions into an asynchronous world.

To avoid including a huge listing, I’ve broken up the program into parts that I’ll discuss separately. If you want to follow along, create an F# console program and simply add the code from each successive listing into Program.fs.

We’ll start with a module that can print colored messages to the console, which will be useful to show when downloads start, complete, fail, and so forth (Listing 10-1). The report function also shows the managed thread ID for the current thread, which will help us explore the behavior of our program as we transition it to an asynchronous approach.

Notice also how I use partial application, as introduced in the previous chapter, to provide functions called red, yellow, and so forth to issue messages in those colors.
module Log =
    open System
    open System.Threading
    /// Print a colored log message.
    let report (color : ConsoleColor) (message : string) =
        Console.ForegroundColor <- color
        printfn "%s (thread ID: %i)"
            message Thread.CurrentThread.ManagedThreadId
        Console.ResetColor()
    let red = report ConsoleColor.Red
    let green = report ConsoleColor.Green
    let yellow = report ConsoleColor.Yellow
    let cyan = report ConsoleColor.Cyan
Listing 10-1

Printing colored console messages

Next, we have a module to model the fact that a download can succeed or fail. I haven’t used the Option type here because I want to be able to report back the name of a file that has failed. Hence both the OK and Failed cases of the Discriminated Union (DU) have a filename payload (Listing 10-2).
module Outcome =
    type Outcome =
        | OK of filename:string
        | Failed of filename:string
    let isOk = function
        | OK _ -> true
        | Failed _ -> false
    let fileName = function
        | OK fn
        | Failed fn -> fn
Listing 10-2

A Discriminated Union to model success and failure

The Outcome module also offers convenience functions for deciding whether a given Outcome instance is a success or not, and for recovering a filename from an Outcome instance. In these functions, I use the function keyword, which allows me to pattern match from a DU instance to a case without providing an explicit DU instance parameter. For example, the isOk function could also have been expressed as in Listing 10-3.
    let isOk outcome =
        match outcome with
        | OK _ -> true
        | Failed _ -> false
Listing 10-3

The isOk function in more verbose form

Now let’s write some functions that get the file’s download links from the target web page (Listing 10-4). The absoluteUri function deals with the fact that some web pages provide download links relative to their own addresses (e.g., downloads/myfile.txt) while others provide absolute addresses (e.g., https://mysite.org/downloads/myfile.txt ). The code here is pretty simplistic and may not work in all cases, but I wanted to keep it simple, as URL processing is not the main topic of this chapter.

The getLinks function takes a URI and a regular expression pattern, and parses the web page to get all the download links that match the pattern. Note that this function uses HtmlDocument.Load , which is provided by the FSharp.Data Nuget package. You’ll need to use Nuget or Paket to add this package to your console project.
module Download =
    open System
    open System.IO
    open System.Net
    open System.Text.RegularExpressions
    // From Nuget package "FSharp.Data"
    open FSharp.Data
    let private absoluteUri (pageUri : Uri) (filePath : string) =
        if filePath.StartsWith("http:")
           || filePath.StartsWith("https:") then
            Uri(filePath)
        else
            let sep = '/'
            filePath.TrimStart(sep)
            |> (sprintf "%O%c%s" pageUri sep)
            |> Uri
    /// Get the URLs of all links in a specified page matching a
    /// specified regex pattern.
    let private getLinks (pageUri : Uri) (filePattern : string) =
        Log.cyan "Getting names..."
        let re = Regex(filePattern)
        let html = HtmlDocument.Load(pageUri.AbsoluteUri)
        let links =
            html.Descendants ["a"]
            |> Seq.choose (fun node ->
                node.TryGetAttribute("href")
                |> Option.map (fun att -> att.Value()))
            |> Seq.filter (re.IsMatch)
            |> Seq.map (absoluteUri pageUri)
            |> Seq.distinct
            |> Array.ofSeq
        links
Listing 10-4

Functions for getting download links from a web page

Next up we have a function that attempts to download a file from a given URI to a given local path (Listing 10-5). If you are following along, the code for this listing, and for Listing 10-6, should be included in the Download module we started in Listing 10-4. The tryDownload function uses WebClient.DownloadFile to do its work. It reports success by returning Outcome.OK, or failure (if there is an exception) by returning Outcome.Failed.
    /// Download a file to the specified local path.
    let private tryDownload (localPath : string) (fileUri : Uri) =
        let fileName = fileUri.Segments |> Array.last
        Log.yellow (sprintf "%s - starting download" fileName)
        let filePath = Path.Combine(localPath, fileName)
        use client = new WebClient()
        try
            client.DownloadFile(fileUri, filePath)
            Log.green (sprintf "%s - download complete" fileName)
            Outcome.OK fileName
        with
        | e ->
            Log.red (sprintf "%s - error: %s" fileName e.Message)
            Outcome.Failed fileName
Listing 10-5

The tryDownload function

Also within the Download module , we have one public function, GetFiles (Listing 10-6). GetFiles uses getLinks to list the required download links, and calls tryDownload for each of the resulting paths. The results are divided into successes and failures using Array.partition, and finally these are mapped into a tuple of arrays of names for the failures and successes respectively.
    /// Download all the files linked to in the specified webpage, whose
    /// link path matches the specified regular expression, to the specified
    /// local path. Return a tuple of succeeded and failed file names.
    let GetFiles
            (pageUri : Uri) (filePattern : string) (localPath : string) =
        let links = getLinks pageUri filePattern
        let downloaded, failed =
            links
            |> Array.map (tryDownload localPath)
            |> Array.partition Outcome.isOk
        downloaded |> Array.map Outcome.fileName,
        failed |> Array.map Outcome.fileName
Listing 10-6

The GetFiles function

Finally, in Listing 10-7, we have a main function for the console program. It calls Download.GetFiles to do its work. We also use a System.Diagnostics.Stopwatch to time the whole operation, and we list out all the failed files at the end of processing.

Note

You’ll need to create a directory called “c: empdownloads,” or amend the code to use a directory that exists.

open System
open System.Diagnostics
[<EntryPoint>]
let main argv =
    // Some minor planets data:
    let uri = Uri @"https://minorplanetcenter.net/data"
    let pattern = @"neam.*.json.gz$"
    let localPath = @"c: empdownloads"
    let sw = Stopwatch()
    sw.Start()
    let downloaded, failed =
        Download.GetFiles uri pattern localPath
    failed
    |> Array.iter (fun fn ->
        Log.report ConsoleColor.Red (sprintf "Failed: %s" fn))
    Log.cyan
        (sprintf "%i files downloaded in %0.1fs, %i failed. Press a key"
            downloaded.Length sw.Elapsed.TotalSeconds failed.Length)
    Console.ReadKey() |> ignore
    0
Listing 10-7

The console program’s main function

In case you want to try out the program on different web pages, here is a table of web pages where you will find some files to download, and some corresponding regular expression patterns (Table 10-1).
Table 10-1

Some Download URLs and Name Patterns

Url

Pattern

Comments

https://minorplanetcenter.net/data

neam.*.json.gz$

Minor planets

http://compling.hss.ntu.edu.sg/omw

.zip$

Computational Linguistics

http://storage.googleapis.com/books/ngrams/books/datasetsv2.html

eng-1M-2gram.*.zip$

Google n-grams

Very large. Don’t download this over a metered connection!

Running the Synchronous Downloader

Here’s the output I got when I ran our synchronous program for the minor planets data (Listing 10-8).
Getting names... (thread ID: 1)
neam00_extended.json.gz - starting download (thread ID: 1)
neam00_extended.json.gz - download complete (thread ID: 1)
neam01_extended.json.gz - starting download (thread ID: 1)
neam01_extended.json.gz - download complete (thread ID: 1)
...
neam15_extended.json.gz - starting download (thread ID: 1)
neam15_extended.json.gz - download complete (thread ID: 1)
16 files downloaded in 52.7s, 0 failed. Press a key (thread ID: 1)
Listing 10-8

Behavior of the synchronous downloader

The files are downloaded one at a time, everything happens on the same thread (ID: 1), and the whole process takes about a minute.

Figure 10-1 shows what’s happening on my WiFi connection during the run period.
../images/462726_1_En_10_Chapter/462726_1_En_10_Fig1_HTML.jpg
Figure 10-1

WiFi usage during a run of the synchronous mass downloader

While the WiFi connection is kept fairly busy, it certainly isn’t maxed out. But the main concern with the behavior of this synchronous version is the fact that it hogs an entire thread throughout the time it is running. It does this even though much of the time is spent waiting for server responses as blocks of data are sent over the network. In .NET a thread is considered quite an expensive resource, one which – on a busy machine – could be doing other work during these waits. This other work could be for other unrelated programs on the same machine, or simply downloading other files for this same bulk download run.

Converting Code to Asynchronous

To remedy the situation, we need to go through all our code to identify operations where our code is “ordering pizza”: in other words, starting an operation that will take a significant amount of time, and which doesn’t require our main thread’s attention to complete. Typically, this will be input/output operations, where the real work happens in disc controllers, network interfaces, networks, and remote servers. The first place where our code orders pizza is in the getLinks function (back in Listing 10-4), where we load an HTML document that comes from a remote server:
        let html = HtmlDocument.Load(pageUri.AbsoluteUri)
If you look at the Intellisense for HtmlDocument, you might notice that there’s also an AsyncLoad function. What if you simply use this in your html binding? (Listing 10-9).
    let private getLinks (pageUri : Uri) (filePattern : string) =
        Log.cyan "Getting names..."
        let re = Regex(filePattern)
        // val html : Async<HtmlDocument>
        let html = HtmlDocument.AsyncLoad(pageUri.AbsoluteUri)
Listing 10-9

The return type of HtmlDocument.AsyncLoad

The code following the let html = binding won’t compile now, because html is no longer an HtmlDocument instance, it’s an Async<HtmlDocument>. Instead of giving you a pizza, the person at the counter has given you a pager: effectively the promise of a pizza and a means of knowing when it’s ready. So, just like when you enter a restaurant that uses a pager system, you need to adjust your expectations and behave a little differently: that is, don’t eat the pager!

The way to achieve this change of worldview in F# is with an async computation expression, which is very easy to use. Firstly, move the whole body of the getLinks function into curly brackets, and place the word async before these. Instead of let to bind the html value, use let! . Finally, instead of simply “mentioning” the links value at the end of the function to return it, explicitly return it using the return keyword (Listing 10-10).
    /// Get the URLs of all links in a specified page matching a
    /// specified regex pattern.
    let private getLinks (pageUri : Uri) (filePattern : string) =
        async {
            Log.cyan "Getting names..."
            let re = Regex(filePattern)
            let! html = HtmlDocument.AsyncLoad(pageUri.AbsoluteUri)
            let links =
                html.Descendants ["a"]
                |> Seq.choose (fun node ->
                    node.TryGetAttribute("href")
                    |> Option.map (fun att -> att.Value()))
                |> Seq.filter (re.IsMatch)
                |> Seq.map (absoluteUri pageUri)
                |> Seq.distinct
                |> Array.ofSeq
            return links
        }
Listing 10-10

Placing a function body into an async computation expression

The let! and return keywords are only valid in the context of computation expressions such as async {}. Here, let! effectively means “Please get me a pizza and page me when it’s ready. I will come back to this exact point when you page me. In the meantime, I’ll feel free to talk to my friends.” Using return is analogous to linking a particular pizza order with a pager, and handing over the pager instead of the pizza.

The next place where we “order pizza” is in the tryDownload function, where we use WebClient.DownloadFile :
            client.DownloadFile(fileUri, filePath)

Again this is an I/O operation that is going to take time, in this case an eternity in CPU terms because we might be downloading large files. There are two asynchronous methods in the WebClient API to choose from: DownloadFileAsync and DownloadFileTaskAsync . The one we want is DownloadFileTaskAsync. The other one requires us to provide an event handler to notify us of completion, almost as if we had to give the pizza restaurant our own pager. This seems a bit too much trouble to be worth it, even for pizza.

To use DownloadFileTaskAsync in the context of an F# async computation expression, we need to do two things. First, we need to translate it from a C# Task into an F# Async, which you can easily do using Async.AwaitTask . (I’ll follow up on the differences between Task and Async in a moment.) Second, since this is an imperative operation that doesn’t of itself return anything, we need to use the do! keyword instead of let! to specify that it should be run asynchronously without returning a value. And finally, we need to use the return keyword to return the Outcome.OK or Outcome.Failed results (Listing 10-11).
    /// Download a file to the specified local path.
    let private tryDownload (localPath : string) (fileUri : Uri) =
        async {
            let fileName = fileUri.Segments |> Array.last
            Log.yellow (sprintf "%s - starting download" fileName)
            let filePath = Path.Combine(localPath, fileName)
            use client = new WebClient()
            try
                do!
                    client.DownloadFileTaskAsync(fileUri, filePath)
                    |> Async.AwaitTask
                Log.green (sprintf "%s - download complete" fileName)
                return (Outcome.OK fileName)
            with
            | e ->
                Log.red (sprintf "%s - error: %s" fileName e.Message)
                return (Outcome.Failed fileName)
        }
Listing 10-11

Using Async.AwaitTask and do! to perform an async imperative operation

By now you should be able to see a pattern emerging in what we need to do to make a function asynchronous:
  • Place the body in an async {}.

  • Identify any time-consuming external operations where the API you are using offers an Async version.

  • Use let! or do! to bind or imperatively execute them. Where necessary, use Async.AwaitTask to translate from C# Task to F# Async.

  • Return (the promise of) results using the return keyword.

Incidentally, from F# 4.5 there is also a match! keyword, which you can use to call async functions, and pattern match on the results, in a single operation.

Next we need to apply a similar recipe to the next level up: the GetFiles function that calls getLinks and tryDownload to do its work. We can start off in exactly the same way, placing the whole function body in async{} and using let! to bind getLinks (Listing 10-12).
    let AsyncGetFiles
        (pageUri : Uri) (filePattern : string) (localPath : string) =
        async {
            let! links = getLinks pageUri filePattern
            ...
Listing 10-12

Starting to make GetFiles asynchronous

Since GetFiles is effectively the public API of the Download module, I’ve also renamed it AsyncGetFiles to cue callers that this is an asynchronous function.

The next few lines of GetFiles require a little more thought. The current code looks like what is in Listing 10-13.
        let downloaded, failed =
            links
            |> Array.map (tryDownload localPath)
            |> Array.partition Outcome.isOk
Listing 10-13

Synchronous download code

This is no longer good enough because tryDownLoad is now not a function that will immediately do its work: instead it’s a promise of work not yet even started. We could make the code compile by forcing the computation to execute and awaiting its result in (Listing 10-14), but then we’ve gained almost nothing, because the download operations are still performed one at a time, even though they run on different threads.
            let downloaded, failed =
                links
                |> Array.map (fun link ->
                    tryDownload localPath link
                    |> Async.RunSynchronously)
                |> Array.partition Outcome.isOk
Getting names... (thread ID: 1)
neam00_extended.json.gz - starting download (thread ID: 9)
neam00_extended.json.gz - download complete (thread ID: 15)
neam01_extended.json.gz - starting download (thread ID: 7)
neam01_extended.json.gz - download complete (thread ID: 15)
neam02_extended.json.gz - starting download (thread ID: 15)
...
neam15_extended.json.gz - starting download (thread ID: 15)
neam15_extended.json.gz - download complete (thread ID: 7)
16 files downloaded in 56.1s, 0 failed. Press a key (thread ID: 1)
Listing 10-14

An anti-pattern for multiple, similar async computations

This is like ordering multiple pizzas, taking the pager for each pizza, but then standing at the counter in everyone else’s way until each pager flashes.

Instead what we want to do is gather all the ready-to-go computations and run them simultaneously (or at least allow .NET to run them as simultaneously as resources allow). This can be achieved by sending the results of Seq.map (tryDownload...) into the function Async.Parallel , and using a let! binding to bind the results (Listing 10-15).
    let AsyncGetFiles (pageUri : Uri) (filePattern : string) (localPath : string) =
        async {
            let! links = getLinks pageUri filePattern
            let! downloadResults =
                links
                |> Seq.map (tryDownload localPath)
                |> Async.Parallel
            let downloaded, failed =
                downloadResults
                |> Array.partition Outcome.isOk
            return
                downloaded |> Array.map Outcome.fileName,
                failed |> Array.map Outcome.fileName
        }
Listing 10-15

Using Async.Parallel

We’ll refine this logic later, but this is good enough for now. Finally, we need to amend the program’s main function slightly, so that it calls AsyncGetFiles and waits for its results (Listing 10-16).
open System
open System.Diagnostics
[<EntryPoint>]
let main argv =
    // Some minor planets data:
    let uri = Uri @"https://minorplanetcenter.net/data"
    let pattern = @"neam.*.json.gz$"
    let localPath = @"c: empdownloads"
    let sw = Stopwatch()
    sw.Start()
    let downloaded, failed =
        Download.AsyncGetFiles uri pattern localPath
        |> Async.RunSynchronously
    failed
    |> Array.iter (fun fn ->
        Log.report ConsoleColor.Red (sprintf "Failed: %s" fn))
    Log.cyan
        (sprintf "%i files downloaded in %0.1fs, %i failed. Press a key"
            downloaded.Length sw.Elapsed.TotalSeconds failed.Length)
    Console.ReadKey() |> ignore
    0
Listing 10-16

Calling AsyncGetFiles

Locking Shared Resources

There’s one more task to do, and that is to control access to a shared, mutable resource that all the download tasks will use concurrently. And what is that resource? It’s the console, with its colored messages! Each of the simultaneous computations might output to the console at any time, so if you don’t control access to it you’ll get jumbled-up messages and colors. The fix is relatively easy: use the lock keyword (Listing 10-17).
    let report =
        let lockObj = obj()
        fun (color : ConsoleColor) (message : string) ->
            lock lockObj (fun _ ->
                Console.ForegroundColor <- color
                printfn "%s (thread ID: %i)"
                    message Thread.CurrentThread.ManagedThreadId
                Console.ResetColor())
Listing 10-17

Make a function thread safe using a lock expression

The new version of report is a nice example of the technique we introduced in the previous chapter: using a binding that creates some state but keeps it private, then returns a function that uses that state. In this case the state in question is simply an arbitrary object that is used by the lock expression to ensure exclusive access.

Needless to say, locking is a very complex subject. But in this context, Listing 10-17 shows a simple and effective way to achieve exclusive access for an operation that won’t take long to run.

Testing Asynchronous Downloads

It is time to check whether our shiny new asynchronous download performs better. Here are the results of running against the minor planets data (Listing 10-18, compare with Listing 10-8).
Getting names... (thread ID: 1)
neam01_extended.json.gz - starting download (thread ID: 7)
neam00_extended.json.gz - starting download (thread ID: 4)
neam02_extended.json.gz - starting download (thread ID: 3)
...
neam15_extended.json.gz - starting download (thread ID: 7)
neam03_extended.json.gz - download complete (thread ID: 23)
neam00_extended.json.gz - download complete (thread ID: 22)
neam02_extended.json.gz - download complete (thread ID: 21)
neam01_extended.json.gz - download complete (thread ID: 21)
...
neam12_extended.json.gz - download complete (thread ID: 18)
16 files downloaded in 14.1s, 0 failed. Press a key (thread ID: 1)
Listing 10-18

Log messages from an asynchronous run

The differences between this and Listing 10-8 are striking:
  • The downloads run on several threads, and the thread that logs the completion of a download is usually different from the thread that started it. This is the magic of let! and do!.

  • All the downloads are started before any of them complete. Compare that with the way started/completed messages simply alternate in the synchronous version.

  • Most importantly of all, the whole operation takes 14 seconds instead of a minute.

The usage of my WiFi connection is equally striking (Figure 10-2).
../images/462726_1_En_10_Chapter/462726_1_En_10_Fig2_HTML.jpg
Figure 10-2

WiFi throughput downloading files asynchronously

In Figure 10-1, throughput on the interface was very spiky and peaked at about 11Mbps. In the asynchronous version, we get up to over 40Mbps, and the throughput is pretty consistent over the brief time the run lasts. Given this is going over WiFi, through my home’s highly questionable, main wiring, then through England’s almost equally, questionable fiber infrastructure, this is really pretty good.

Batching

One thing I’ve learned in several decades of coding is never to trust one’s first successful run! Let’s try the same code against some of the Google n-grams dataset. (You’ll find the URL and regular expression pattern for this in Table 10-1).

Note

This is a large dataset. Don’t do this on a metered connection, or while your household is trying to watch Netflix!

This is how things looked after a few minutes of running (Listing 10-19 and Figure 10-3).
Getting names... (thread ID: 1)
googlebooks-eng-1M-2gram-20090715-14.csv.zip - starting download (thread ID: 17)
googlebooks-eng-1M-2gram-20090715-0.csv.zip - starting download (thread ID: 3)
...approximately 100 similar lines...
googlebooks-eng-1M-2gram-20090715-98.csv.zip - starting download (thread ID: 13)
Listing 10-19

Downloading a large number of files

../images/462726_1_En_10_Chapter/462726_1_En_10_Fig3_HTML.jpg
Figure 10-3

WiFi throughput while downloading a large number of files

Something is certainly going on, as evidenced by the WiFi throughput. But even after about 10 minutes, no download had been completed. This pattern might not be ideal for a couple of reasons:
  • Although Google’s servers will probably be just fine, some other services might throttle if you ask for too much at once. Database servers might even run out of connection resources if not configured to service a tsunami of requests like this.1

  • We might want to start work on some downloaded files as soon as possible. For instance, we might want to start uncompressing them or getting data out of them as soon as they are downloaded. In the pizza analogy, we don’t want all the cooks to spend their time kneading dough and chopping toppings for a large order, when they could be spending at least some time putting batches of assembled pizzas into ovens.

So how do we deal with this? One possibility is to explicitly batch our computations into groups of a specified size, then send each batch through individually using Async.Parallel just across the batch (Listing 10-20).
    let AsyncGetFilesBatched
        (pageUri : Uri) (filePattern : string) (localPath : string)
        (batchSize : int) =
        async {
            let! links = getLinks pageUri filePattern
            let downloaded, failed =
                links
                |> Seq.map (tryDownload localPath)
                |> Seq.chunkBySize batchSize
                |> Seq.collect (fun batch ->
                    batch
                    |> Async.Parallel
                    |> Async.RunSynchronously)
                |> Array.ofSeq
                |> Array.partition Outcome.isOk
            return
                downloaded |> Array.map Outcome.fileName,
                failed |> Array.map Outcome.fileName
        }
Listing 10-20

Using Seq.chunkBySize to create computation batches

In Listing 10-20 we use Seq.chunkBySize , which groups a sequence into batches of specified size (the last batch might be smaller). Then for each such batch we do an Async.Parallel |> Async.RunSynchronously to run just that batch in parallel. If you are following along, don’t forget to alter the main function code to call the new AsyncGetFilesBatched:
Download.AsyncGetFilesBatched uri pattern localPath 4
The behavior for this version is shown in Listing 10-21 and Figure 10-4, using a batch size of 4.
Getting names... (thread ID: 1)
googlebooks-eng-1M-2gram-20090715-11.csv.zip - starting download (thread ID: 14)
googlebooks-eng-1M-2gram-20090715-1.csv.zip - starting download (thread ID: 4)
googlebooks-eng-1M-2gram-20090715-10.csv.zip - starting download (thread ID: 5)
googlebooks-eng-1M-2gram-20090715-0.csv.zip - starting download (thread ID: 3)
googlebooks-eng-1M-2gram-20090715-1.csv.zip - download complete (thread ID: 16)
googlebooks-eng-1M-2gram-20090715-0.csv.zip - download complete (thread ID: 16)
googlebooks-eng-1M-2gram-20090715-10.csv.zip - download complete (thread ID: 4)
googlebooks-eng-1M-2gram-20090715-11.csv.zip - download complete (thread ID: 15)
googlebooks-eng-1M-2gram-20090715-13.csv.zip - starting download (thread ID: 15)
googlebooks-eng-1M-2gram-20090715-12.csv.zip - starting download (thread ID: 3)
googlebooks-eng-1M-2gram-20090715-14.csv.zip - starting download (thread ID: 4)
googlebooks-eng-1M-2gram-20090715-15.csv.zip - starting download (thread ID: 14)
Listing 10-21

Behavior of explicitly batched download

../images/462726_1_En_10_Chapter/462726_1_En_10_Fig4_HTML.jpg
Figure 10-4

WiFi throughput during explicitly batched download

On the plus side, we do start seeing downloads complete much earlier in the process, meaning that we could get started with further processing of those files. But notice the pattern of the log messages. The first file of the second batch doesn’t start downloading until the last file of the first batch has finished downloading. Hence the four-deep bands of “started” and “completed” messages in the log. This is reflected in the network throughput: it dips toward the end of each batch as the last part of the last file dribbles through.

What we need is throttling: the ability to start a limited number of computations simultaneously, and to start a new one each time a previous one completes.

Throttling

It would certainly be possible to write one’s own throttled, parallel computation logic. But this is such a common pattern, it’s worth looking to see if someone has solved it in a general way. As it turns out, the F# extensions library FSharpx contains exactly the functionality we need. (At the time of writing, this functionality had not been moved into the core F# libraries, but we live in hope that this will eventually happen.)

Use Nuget or Paket to add the package “FSharpx.Async” to your project. Then add an AsyncGetFilesThrottled function that uses Async.ParallelWithThrottle (Listing 10-22).
    // From nuget package "FSharpx.Async"
    open FSharpx.Control
    let AsyncGetFilesThrottled
        (pageUri : Uri) (filePattern : string) (localPath : string)
        (throttle : int) =
        async {
            let! links = getLinks pageUri filePattern
            let! downloadResults =
                links
                |> Seq.map (tryDownload localPath)
                |> Async.ParallelWithThrottle throttle
            let downloaded, failed =
                downloadResults
                |> Array.partition Outcome.isOk
            return
                downloaded |> Array.map Outcome.fileName,
                failed |> Array.map Outcome.fileName
        }
Listing 10-22

Asynchronous, parallel, throttled downloads

Async.ParallelWithThrottle is like Async.Parallel but takes one additional parameter to specify the throttle size: the largest number of computations that will be started simultaneously. If you are following along, change the main function code to call AsyncGetFilesThrottled:
Download.AsyncGetFilesThrottled uri pattern localPath 4
This behaves really nicely, as you can see from its log messages and WiFi throughput (Listing 10-23 and Figure 10-5).
Getting names... (thread ID: 1)
googlebooks-eng-1M-2gram-20090715-11.csv.zip - starting download (thread ID: 7)
googlebooks-eng-1M-2gram-20090715-1.csv.zip - starting download (thread ID: 14)
googlebooks-eng-1M-2gram-20090715-10.csv.zip - starting download (thread ID: 12)
googlebooks-eng-1M-2gram-20090715-0.csv.zip - starting download (thread ID: 3)
googlebooks-eng-1M-2gram-20090715-10.csv.zip - download complete (thread ID: 14)
googlebooks-eng-1M-2gram-20090715-12.csv.zip - starting download (thread ID: 12)
googlebooks-eng-1M-2gram-20090715-0.csv.zip - download complete (thread ID: 14)
googlebooks-eng-1M-2gram-20090715-13.csv.zip - starting download (thread ID: 3)
googlebooks-eng-1M-2gram-20090715-11.csv.zip - download complete (thread ID: 7)
Listing 10-23

Behavior of a parallel, throttled download

../images/462726_1_En_10_Chapter/462726_1_En_10_Fig5_HTML.jpg
Figure 10-5

WiFi throughput during parallel, throttled download

Initially a batch of 4 downloads is started, then as soon as one completes, another one is started on whatever thread happens to be available. This keeps the network connection nice and busy but without having a great number of downloads all fighting for limited bandwidth.

C# Task versus F# Async

Now that you’ve seen the benefits of asynchronous programming, it’s time to revisit something we glossed over earlier: the difference between an F# Async and a C# Task. They each represent their language’s conception of an asynchronous computation that will return some type when completed. However, there is an important difference. C# uses a “hot task” model: when something creates a Task instance, the underlying computation is already running. F# uses a “cold task” model: the caller is responsible for starting the computation. This has some advantages in terms of composability. For example, in Listing 10-20 we had the opportunity to group tasks into batches of a fixed size using Seq.chunkBySize before finally launching them using Async.Parallel and Async.RunSynchronously.

Both Async and Task are, of course, valid models: the problems arise when we have to stand astride both worlds. For example, in Listing 10-10 we were able to use the result of HtmlDocument.AsyncLoad directly in a let! binding thus:
            let! html = HtmlDocument.AsyncLoad(pageUri.AbsoluteUri)
…because HtmlDocument is an F#-first API and returns an Async - which is what let! expects. By contrast, in Listing 10-11 we used WebClient.DownloadFileTaskAsync, which returns a C# Task. To make it compatible with do!, which expects an F# Async, we had to pipe it into Async.AwaitTask.
                do!
                    client.DownloadFileTaskAsync(fileUri, filePath)
                    |> Async.AwaitTask

Although we happened to be using do! in this case, the same would have applied for a let! binding.

The Async versus Task dichotomy has a number of practical and stylistic implications that you should always be aware of when coding in F#.
  • As we’ve already said, when an API returns a C# Task, you’ll have to convert it into an F# Async using Async.AwaitTask if you want to use it in an async computation expression with let! and do!.

  • If you’re writing a general-purpose API that exposes asynchronous functions, you should by default return a C# Task rather than an F# Async. This follows the general guidance that APIs for use in languages other than F# should not expose F#-specific types. You can work in terms of F# Async internally, and at the last moment convert into C# Task using Async.StartAsTask.

  • API functions that return a C# Task should be named with a suffix of Async – for example, WebClient.DownloadFileTaskAsync.

  • It’s OK, though, for APIs aimed primarily at F# consumers, such as FSharpx.HtmlDocument, to expose asynchronous functions that return F# Async instances.

  • F#-centric APIs that return an F# Async should be named with a prefix of Async – for example, HtmlDocument.AsyncLoad.

Incidentally, if you want to provide overloads to allow cancellation tokens to be sent in, you can use code as in Listing 10-24.
type NiceCSharp =
  static member AsyncSomeWork(…) =
    Async.StartAsTask(someWork(…))
  static member AsyncSomeWork(cancellationToken) =
    Async.StartAsTask(someWork(…), cancellationToken = cancellationToken)
Listing 10-24

Overloads for exposing C# Task functions

(The code in Listing 10-24 is courtesy of Tomas Petricek via stackoverflow.com.)

Finally, I should point out that, at the time of writing, there is an F# language proposal to offer a task {} computation expression, that is, one that works in terms of C# Task rather than F# Async. This may simplify some coding in this area, but perhaps at the cost of some of the composability that F# Async offers.

Recommendations

Here are some basic steps that are worth taking away from this chapter:
  • Get your business logic working correctly in a synchronous way.

  • Identify places where you are “ordering pizza,” in other words. making a request, usually via an API, which will take some time and doesn’t require the involvement of the current thread. Any good API should offer an asynchronous implementation of the call you are making.

  • Assuming the function from where you are ordering pizza is reasonably well factored, simply enclose its body with async {}. Change the ordering-pizza calls from let to let!, or if they are imperative, use do! If you are using F# 4.5 or later you can also use match!.

  • If the function from where you are ordering pizza is not well factored, you may need to break it down to make it easier to enclose the appropriate code in async {} blocks.

  • If an asynchronous API call returns a C# Task rather than an F# Async, you’ll also have to convert the Task to an Async using Async.StartAsTask.

  • Return (the promise of) data from the async{} expression using the return keyword.

  • Do the same thing to any higher-level functions that call the functions you just changed. Keep on going until you reach the top of the hierarchy, where you actually use the data or expose an API. I’ve heard this process referred to as implementing async “all the way down.”

  • If exposing an API for use by other languages, translate the F# Async to a C# Task using Async.StartAsTask. This avoids exposing F# types where they may not be natively understood.

  • To actually get results, use Async.RunSynchronously. But do this as little as possible – generally at the top of your “async-all-the-way-down” chain. You may not have to do it at all if you want external code that calls your functions to decide when to wait for results.

  • To run similar, independent requests in parallel, use Async.Parallel, or Async.ParallelWithThrottle from the FSharpx.Async package. (Eventually this may move into core F# libraries.)

  • Finally, all this is moot if your computation is limited by the local CPU power available (“CPU bound”). In those cases, you might as well use Array.Parallel.map or one of its siblings from the Array.Parallel module. We’ll revisit this topic in Chapter 12.

Summary

In this chapter you learned how to deal with situations where your application is “ordering pizza” – in other words, setting off a computation that will take some time, and for which it isn’t necessary for the current thread to stay involved. You found out how to deal with these cases by enclosing them in an async {} block and using let!, match! and do! to set off the time-consuming computation, and to have control to return to the same point (but likely on a different thread) once a result is obtained.

Asynchronous and parallel programming is a huge topic. In a wide-ranging book like this. we can really only scratch the surface. If you want to learn more, I would suggest studying a superb blog post on medium.com, entitled “F# Async Guide,” by Lev Gorodinski (of Jet.com). Then you may also wish to look at the Nuget package “Hopac,” which is a concurrent programming library for F# in the style of Concurrent ML.2 You will find implementations there of most of the concurrent programming patterns you are likely to need in mainstream programming. Having said that, the techniques described in this chapter should serve you well in most situations.

In the next chapter, we’ll look at railway oriented programming, a coding philosophy that encourages you to think about errors as hard as you think about successes, so that both the “happy” and “sad” paths in your code are equally well expressed.

Exercises

This section contains exercises to help you get used to translating code into an asynchronous world.

Exercise 10-1 – Making Some Code Asynchronous

In the following code the Server module contains a simulated server endpoint that returns a random string, taking half a second to do so. In the Consumer module, we call the server multiple times to build up an array of strings, which we then sort to produce a final result.
    open System
    module Random =
        let private random = System.Random()
        let string() =
            let len = random.Next(0, 10)
            Array.init len (fun _ -> random.Next(0, 255) |> char)
            |> String
    module Server =
        let AsyncGetString (id : int) =
            // id is unused
            async {
                do! Async.Sleep(500)
                return Random.string()
            }
    module Consumer =
        let GetData (count : int) =
            let strings =
                Array.init count (fun i ->
                    Server.AsyncGetString i
                    |> Async.RunSynchronously)
            strings
            |> Array.sort
    let demo() =
        let sw = System.Diagnostics.Stopwatch()
        sw.Start()
        Consumer.GetData 10
        |> Array.iter (printfn "%s")

                printfn "That took %ims" sw.ElapsedMilliseconds

If you run the demo() function, you’ll notice that this operation takes over 5 seconds to get 10 results.

Change the Consumer.GetData() function so that it is asynchronous, and so that it runs all its calls to Server.AsyncGetString() in parallel.

You don’t need to throttle the parallel computation. Consumer.GetData() should be an F# style async function, that is, it should return Async<String[]>.

Hint: You’ll also need to change the demo() function so that the result of Consumer.GetData is passed into Async.RunSynchronously.

Exercise 10-2 – Returning Tasks

How would your solution to Exercise 10-1 change if Consumer.GetData() needed to return a C# style Task?

Exercise Solutions

Exercise 10-1 – Making Some Code Asynchronous

Rename Consumer.GetData() to AsyncGetData() to reflect its new return type. Enclose its body in an async {} block. Change the binding of strings from let to let!. Remove the call to Async.RunSynchronously and instead pass the results of the Array.init (which will now be an array of Async<string> instances) into Async.Parallel. Finally, return the result of sorting the array explicitly using the return keyword.
        let AsyncGetData (count : int) =
            async {
                let! strings =
                    Array.init count (fun i -> Server.AsyncGetString i)
                    |> Async.Parallel
                return
                    strings
                    |> Array.sort
            }
In the demo() function, pass the result of Consumer.AsyncGetData into Async.RunSynchronously to actually run the computation.
    let demo() =
        let sw = System.Diagnostics.Stopwatch()
        sw.Start()
        Consumer.AsyncGetData 10
        |> Async.RunSynchronously
        |> Array.iter (printfn "%s")
        printfn "That took %ims" sw.ElapsedMilliseconds

Run demo() to verify that the computation takes roughly half a second.

Exercise 10-2 – Returning Tasks

Rename Consumer.AsyncGetData() to GetDataAsync() to reflect its new return type. After the end of its async {} block, add |> Async.StartAsTask to start the computation running and return a C# Task.
        let GetDataAsync (count : int) =
            async {
                let! strings =
                    Array.init count (fun i -> Server.AsyncGetString i)
                    |> Async.Parallel
                return
                    strings
                    |> Array.sort
            } |> Async.StartAsTask
In the demo() function, add an Async.AwaitTask call to await the result of the task.
    let demo() =
        let sw = System.Diagnostics.Stopwatch()
        sw.Start()
        Consumer.GetDataAsync 10
        |> Async.AwaitTask
        |> Async.RunSynchronously
        |> Array.iter (printfn "%s")
        printfn "That took %ims" sw.ElapsedMilliseconds
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.187.119