© Robert Pickering and Kit Eason 2016

Robert Pickering and Kit Eason, Beginning F# 4.0, 10.1007/978-1-4842-1374-2_9

9. Parallel Programming

Robert Pickering and Kit Eason

(1)St. Germain-En-Laye, France

Parallel programming has recently moved from being a relatively obscure topic, practiced only by specialist developers, to a more mainstream endeavor. This is due to the increasing prevalence of multicore processors . At the time of writing, it is almost impossible buy a PC with a single core processor, and machines with four, eight, or more processers are readily available. It is fully expected that this trend will continue in the years to come.

To a certain extent, this interest in parallel programming has driven the renewed interest in functional programming. Functional programming is certainly not a silver bullet for all parallel programming problems, but it can help you design your software so it executes in parallel. In this chapter, you will learn about some of the simpler techniques to help your software execute in parallel, as well as how to take advantage of several processors.

It’s often helpful to break down parallel programming into several smaller subtopics, all of which you’ll learn about this chapter:

  • Threads, memory, locking, and blocking: You’ll learn about basic techniques for creating and controlling threads in .NET programming. You’ll also take a quick look at how to share resources (such as memory) between threads, as well as how to control access to these shared resources.

  • Reactive programming: It’s often important to the user experience that programs remain reactive to input. To do this, it’s important that you avoid doing too much processing on the thread responsible for reacting to user input. This is particularly relevant to GUI programming, but it can also apply to a server that needs to stay responsive to incoming requests.

  • Data parallelism: This term refers to executing one piece of code concurrently on several processors with varying input data. This is a good way to parallelize the processing of large data structures such as collections. It’s often possible to apply a transformation to several items in a collection in parallel, which will generally speed up the overall execution time. The classic example of this is the parallel map, which provides one of the simplest ways to parallelize a functional program.

  • Asynchronous programming: Some tasks, particularly I/O, need to happen asynchronously to make program execution efficient. It is important that threads are not blocked for long periods while I/O takes place.

  • Message passing: This technique is more formally referred to as the actor model. You use it to coordinate tasks that execute in parallel. This is the most advanced parallel-programming topic covered in this chapter.

Parallel programming is a large topic, so this chapter won't be exhaustive, but it will provide some straightforward ways to help you get started with parallel programming in F#.

Threads, Memory, Locking, and Blocking

If you are serious about parallel programming, it’s worth investing time to understand threads and memory. In this section, you’ll take a look at explicitly creating threads and how to control their access to shared resources, such as memory. My advice is to avoid explicitly creating and managing threads like this; however, when using the other parallel programming techniques, it’s useful to understand the underlying threading concepts.

When a program is executed, the operating system creates a process to execute it. The process represents the resources that are allocated to the program, most notably the memory allocated to it. A process has one or more threadsthat are responsible for executing the program’s instructions and share the process memory. In .NET, a program starts with one thread to execute the program’s code. To create an extra thread in F#, you use the System.Threading.Thread class. The Thread class’s constructor takes a delegate that represents the function the thread will start executing. Once a Thread class has been constructed, it does not start executing automatically: you must call its Start method. The following example demonstrates how to create and start a new thread:

open System.Threading

let main() =
    // create a new thread passing it a lambda function
    let thread = new Thread(fun () ->
        // print a message on the newly created thread
        printfn "Created thread: %i" Thread.CurrentThread.ManagedThreadId)
    // start the new thread
    thread.Start()
    // print an message on the original thread
    printfn "Orginal thread: %i" Thread.CurrentThread.ManagedThreadId
    // wait for the created thread to exit
    thread.Join()


do main()

Compiling and executing the preceding program will output results similar to this:

Orginal thread: 1
Created thread: 3

You should look at a couple of important things in this example. First, notice that the original thread prints its message before the second thread does. This is because calling a thread’s Start method does not immediately start the thread; rather, it schedules a new thread for execution and the operating system chooses when to start it. Normally, the delay will be short, but as the original thread will continue to execute, it’s probable that the original thread will execute a few instructions before the new thread starts executing. Second, notice how you use the thread’s Join function to wait for it to exit. If you did not do this, it is highly probable that the original thread would finish executing before the second thread had a chance to start. While the original thread is waiting for the create thread to do its work, you say that it is blocked. Threads can become blocked for a number of reasons. For example, they might be waiting on a lock, or might be waiting for I/O to complete. When a thread becomes blocked, the operating system switches to the next runnable thread; this is called a context switch. You’ll learn about locking in the next section; in this section, you’ll look at blocking I/O operations in asynchronous programming.

Any resource that can be updated by two different threads at the same time is at risk of being corrupted. This is because a thread can context switch at any time, leaving operations that should have been atomic half done. To avoid corruption, you need to use locks. A lock, sometimes referred to as a monitor, is a section of code where only one thread can pass through it at a time. In F#, you use the lock function to create and control locking. You do this by locking on an object; the idea is that, as soon as the lock is taken, any thread attempting to enter the section of code will be blocked until the lock is released by the thread that holds it. Code protected in this way is sometimes called a critical section. You achieve this by calling System.Threading.Monitor.Enter at the start of the code that you want to protect and System.Threading.Monitor.Exit at the end of that code. You must guarantee that Monitor.Exit is called, or this could lead to threads being locked forever. The lock function is a nice way to ensure that Monitor.Exit is always called if Monitor.Enter has been called. This function takes two parameters: the first is the object you want to lock on, while the second is a function that contains the code you want to protect. This function should take unit as its parameter, and it can return any value.

The following example demonstrates the subtle issues involved in locking. The code to accomplish the lock needs to be quite long, and this example has been deliberately written to exaggerate the problem of context switching. The idea behind this code is this: if two threads run at the same time, both try to write the console. The aim of the sample is to write the string "One ... Two ... Three ... " to the console atomically; that is, one thread should be able to finish writing its message before the next one starts. The example has a function, called makeUnsafeThread, that creates a thread that won’t be able to write to the console atomically, and a second thread, makeSafeThread, that writes to the console atomically by using a lock.

open System
open System.Threading


// function to print to the console character by character
// this increases the chance of there being a context switch
// between threads.
let printSlowly (s : string) =
    s.ToCharArray()
    |> Array.iter (printf "%c")
    printfn ""


// create a thread that prints to the console in an unsafe way
let makeUnsafeThread() =
    new Thread(fun () ->
    for x in 1 .. 100 do
        printSlowly "One ... Two ... Three ... ")


// the object that will be used as a lock
let lockObj = new Object()


// create a thread that prints to the console in a safe way
let makeSafeThread() =
    new Thread(fun () ->
        for x in 1 .. 100 do
            // use lock to ensure operation is atomic
            lock lockObj (fun () ->
                printSlowly "One ... Two ... Three ... "))


// helper function to run the test
let runTest (f: unit -> Thread) message =
    printfn "%s" message
    let t1 = f()
    let t2 = f()
    t1.Start()
    t2.Start()
    t1.Join()
    t2.Join()


// runs the demonstrations
let main() =
    runTest
        makeUnsafeThread
        "Running test without locking ..."
    runTest
        makeSafeThread
        "Running test with locking ..."


do main()

The part of the example that uses the lock is repeated next to highlight the important points. You should note a couple of important factors. First, you use the declaration of the lockObjto create the critical section. Second, you embed your use of the lock function in the makeSafeThread function. The most important thing to notice is how, when printing the functions you want to be atomic, you place them inside the function you want to pass to lock.

// the object that will be used as a lock
let lockObj = new Object()


// create a thread that prints to the console in a safe way
let makeSafeThread() =
    new Thread(fun () ->
        for x in 1 .. 100 do
            // use lock to ensure operation is atomic
            lock lockObj (fun () ->
                printSlowly "One ... Two ... Three ... "))

The results of the first part of the test will vary each time it runs because it depends on when a thread context switches. It might also vary based on the number of processors because multiple threads can run at the same time if a machine has two or more processors, so the messages will be more tightly packed together. On a single-processor machine, the output will be less tightly packed together because printing a message will go wrong only when a content switch takes place. The results of the first part of the sample, run on a dual-processor machine, look like this:

Running test without locking ...
...
One ... Two ... Three ...
One One ... Two ... Three ...
One ... Two ... Three ...
...

The lock means that the results of the second half of the example will not vary at all, so they will always look like this:

Running test with locking ...
One ... Two ... Three ...
One ... Two ... Three ...
One ... Two ... Three ...
...

Locking is an important aspect of concurrency. You should lock any resource that you write to and share between threads. A resource is often a variable, but it can also be a file or even the console, as shown in this example. Although locks can provide a solution to concurrency, they also can also create problems of their own because they can create a deadlock. A deadlock occurs when two or more different threads lock resources that the other thread needs and so neither can advance. The simplest solution to concurrency is often to avoid sharing a resource that different threads can write to. In the rest of this chapter you’ll look at solutions for creating parallel programs that do not rely on explicitly creating locks.

Note

This book provides an extremely brief introduction to threading. You will need to learn much more about threading if you want to become good at parallel programming. A good place to start is the MSDN section on managed threads: https://msdn.microsoft.com/en-us/library/3e8s7xdd(v=vs.110).aspx .

Reactive Programming

Reactive programming refers to the practice of ensuring your programs react to events or input. In this section, you’ll concentrate on reactive programming in terms of GUI programming ; GUIs should always be reactive. However, other styles of programming also need to take reactive programming into account. For example, programs running on servers often need to stay reactive to input, even as they process other, longer running tasks.

Most GUI libraries use an event loop to handle drawing the GUI and the interactions with the user. This means that one thread takes care of drawing the GUI and raising all the events on it. This thread is referred to as the GUI thread. Another consideration: you should update GUI objects only with the GUI thread; you want to avoid creating situations where other threads can corrupt the state of GUI objects. This means that computations or I/O operations that take a significant amount of time should not take place on the GUI thread. If the GUI thread is involved with a long-running computation, it cannot process interactions from the user, nor can it draw the GUI. This is the number one cause of unresponsive GUIs.

You can see this in action in the following example that creates a GUI that could easily become unreactive because it tries to do too much computation on the GUI thread. This example also illustrates how to ensure your GUI remains reactive by making a few simple changes. You will look primarily at a useful abstraction called the BackgroundWorker class, which you find in the System.ComponentModel namespace . This useful class allows you to run some work, raising a notification event when this work is complete. This is especially useful for GUI programming because the completed notification is raised on the GUI thread. This helps you enforce the rule that GUI objects should only be altered from the thread that created them.

Specifically, the example creates a GUI for calculating the Fibonacci numbers using the simple Fibonacci calculation algorithm you saw in Chapter 7. To simplify your setup, this code is designed so that you can place it all in the same .fs file, do a select-all, and it send to F# Interactive.

#if INTERACTIVE
#r "System.Windows.Forms.dll"
#else
module ThreadingDemo
#endif


let fibs =
      (1I,1I) |> Seq.unfold
         (fun (n0, n1) ->
            Some(n0, (n1, n0 + n1)))


let fib n = Seq.item n fibs

Creating a simple GUI for this calculation is straightforward; you can do it using WinForms:

open System
open System.Windows.Forms


let form =
    let form = new Form()
    // input text box
    let input = new TextBox()
    // button to launch processing
    let button = new Button(Left = input.Right + 10, Text = "Go")
    // label to display the result
    let output = new Label(Top = input.Bottom + 10, Width = form.Width,
                           Height = form.Height - input.Bottom + 10,
                           Anchor = (AnchorStyles.Top
                                     ||| AnchorStyles.Left
                                     ||| AnchorStyles.Right
                                     ||| AnchorStyles.Bottom))


    // do all the work when the button is clicked
    button.Click.Add(fun _ ->
        output.Text <- Printf.sprintf "%A" (fib (Int32.Parse(input.Text))))
    // add the controls
    let dc c = c :> Control
    form.Controls.AddRange([|dc input; dc button; dc output |])
    // return the form
    form


// run the form
#if INTERACTIVE
form.ShowDialog() |> ignore
#else
Application.Run()
#endif

Running this example in F# Interactive creates the GUI you see in Figure 9-1.

A340906_2_En_9_Fig1_HTML.jpg
Figure 9-1. A GUI for the Fibonacci numbers

This GUI lets you display the results of your calculation in a reasonable way; unfortunately, your GUI becomes unreactive as soon as the calculation starts to take a long time. You may need to enter inputs of six or even seven digits to see this, depending on the speed of your computer. The following code is responsible for the unresponsiveness:

// do all the work when the button is clicked
button.Click.Add(fun _ ->
    output.Text <- Printf.sprintf "%A" (fib (Int32.Parse(input.Text))))

This code means that you do all the calculation on the same thread that raised the click event: the GUI thread. The GUI thread is responsible for making the calculations, and it cannot process other events while it performs the calculation.

It’s fairly easy to fix this using the background worker:

open System
open System.ComponentModel
open System.Windows.Forms


let form =
    let form = new Form()
    // input text box
    let input = new TextBox()
    // button to launch processing
    let button = new Button(Left = input.Right + 10, Text = "Go")
    // label to display the result
    let output = new Label(Top = input.Bottom + 10, Width = form.Width,
                           Height = form.Height - input.Bottom + 10,
                           Anchor = (AnchorStyles.Top
                                     ||| AnchorStyles.Left
                                     ||| AnchorStyles.Right
                                     ||| AnchorStyles.Bottom))


    // create and run a new background worker
    let runWorker() =
        let background = new BackgroundWorker()
        // parse the input to an int
        let input = Int32.Parse(input.Text)
        // add the "work" event handler
        background.DoWork.Add(fun ea ->
            ea.Result <- fib input)
        // add the work completed event handler
        background.RunWorkerCompleted.Add(fun ea ->
            output.Text <- Printf.sprintf "%A" ea.Result)
        // start the worker off
        background.RunWorkerAsync()


    // hook up creating and running the worker to the button  
    button.Click.Add(fun _ -> runWorker())
    // add the controls
    let dc c = c :> Control
    form.Controls.AddRange([|dc input; dc button; dc output |])
    // return the form
    form


// run the form
#if INTERACTIVE
form.ShowDialog() |> ignore
#else
Application.Run()
#endif

Using the background worker imposes few changes on the code. You do need to split what the code does between the DoWork and the RunWorkerCompleted events , and this means you need to write slightly more code, but this will never require more than a few extra lines. Let’s step though the required code changes. Begin by creating a new instance of the background worker class:

let background = new BackgroundWorker()

You place the code that you need to happen in the background on a different thread—in the DoWork event. You also need to be careful that you extract any data you need from controls outside of the DoWork event. Because this code happens on a different thread, letting that code interact with the GUI objects would break the rule that they should only be manipulated by the GUI thread. You can see the code you use to read the integer and wire up the DoWork event here:

// parse the input to an int
let input = Int32.Parse(input.Text)
// add the "work" event handler
background.DoWork.Add(fun ea ->
    ea.Result <- fib input)

In this example, you extract the input integer from the text box and parse it just before adding the event handler to the DoWork event. Next, the lambda function you added to the DoWork event captures the resulting integer. You should place the result that interests you in the DoWork event’s Result property of the event argument. You can then recover the value in this property in the RunWorkerCompleted event. It too has a result property, which you can see in the following code:

// add the work completed event handler
background.RunWorkerCompleted.Add(fun ea ->
    output.Text <- Printf.sprintf "%A" ea.Result)

You can be certain that the RunWorkerCompleted event runs on the GUI thread, so it is fine to interact with GUI objects. You’ve wired up the events, but you have a couple of tasks remaining. First, you need to start the background worker:

// start the worker off
background.RunWorkerAsync()

Second, you need to add all of this code to the button’s Click event. You’ve wrapped the preceding code in a function called runWorker(), so it’s a simple matter of calling this code in the event handler:

// hook up creating and running the worker to the button  
button.Click.Add(fun _ -> runWorker())

Notice how this means you create a new background worker each time the button is clicked. This happens because a background worker cannot be reused once it’s in use.

Now the GUI remains reactive no matter how many times someone clicks the Go button. This does lead to some other problems; for example, it’s fairly easy to set off two calculations that will take some time to complete. If this happens, the results of both are placed in the same result label, so the user might have no idea which one finished first and is being displayed at the time she sees it. Your GUI remains reactive, but it’s not well adapted to this multithreaded style of programming. One option is to disable all of the controls while the calculation takes place. This might be appropriate for a few case, but it’s not a great option overall because it means the user can take little advantage of your reactive GUI. A better option is to create a system capable of displaying multiple results, along with their initial parameters to ensure that the user knows what a given result means. This example uses a data grid view to display the results:

open System
open System.ComponentModel
open System.Windows.Forms
open System.Numerics


// define a type to hold the results
type Result =
    { Input: int;
      Fibonacci: BigInteger; }


let form =
    let form = new Form()
    // input text box
    let input = new TextBox()
    // button to launch processing
    let button = new Button(Left = input.Right + 10, Text = "Go")
    // list to hold the results
    let results = new BindingList<Result>()
    // data grid view to display multiple results
    let output = new DataGridView(Top = input.Bottom + 10, Width = form.Width,
                                  Height = form.Height - input.Bottom + 10,
                                  Anchor = (AnchorStyles.Top
                                            ||| AnchorStyles.Left
                                            ||| AnchorStyles.Right
                                            ||| AnchorStyles.Bottom),
                                  DataSource = results)


    // create and run a new background worker
    let runWorker() =
        let background = new BackgroundWorker()
        // parse the input to an int
        let input = Int32.Parse(input.Text)
        // add the "work" event handler
        background.DoWork.Add(fun ea ->
            ea.Result <- (input, fib input))
        // add the work completed event handler
        background.RunWorkerCompleted.Add(fun ea ->
            let input, result = ea.Result :?> (int * BigInteger)
            results.Add({ Input = input; Fibonacci = result; }))
        // start the worker off
        background.RunWorkerAsync()


    // hook up creating and running the worker to the button  
    button.Click.Add(fun _ -> runWorker())
    // add the controls
    let dc c = c :> Control
    form.Controls.AddRange([|dc input; dc button; dc output |])
    // return the form
    form


// run the form
#if INTERACTIVE
form.ShowDialog() |> ignore
#else
Application.Run()
#endif

You can see this new GUI in Figure 9-2.

A340906_2_En_9_Fig2_HTML.jpg
Figure 9-2. A GUI that is better adapted to multi-threaded programming

Data Parallelism

Data parallelism relies on executing a single function in parallel with varying data inputs. This breaks work into discrete units so it can be processed in parallel, on separate threads, ensuring that work can be partitioned between the available processors.

Typically this means processing a collection of data in parallel . This method takes advantage of the fact that the items in the collection provide a natural way to partition the work. In the simplest case, a parallel map function, you apply a transformation to each item in the collection, and the results form a new collection. This simple case generally works because each item in the collection can typically be processed independently and in any order. It’s also possible to use this technique to handle more complex scenarios, such as summing all the items in a list; however, it can also prove tricky for some complex cases, and the processing order can take on added significance.

Data parallelism typically relies on libraries and frameworks to provide parallel processing. Although they use multiple threads or processes to provide the parallelism, parallelism doesn’t typically require the user to create or control these threads explicitly; instead, it’s the job of the library or framework to do this. In many such environments, work units can be distributed between different physical machines that form a computing grid; for the sake of simplicity and because multicore systems are becoming more common and powerful, this chapter will concentrate on systems where work is distributed between multiple processors of a single physical machine.

There are two straightforward ways of implementing data-parallel processing in F#. You can use the Array.Parallel module that comes with F#. This approach is simple and can cover a great many practical requirements. Beyond this, you can use the FSharp.Collections.ParallelSeq library available on NuGet.

The Array.Parallel Module

The simplest approach for many data-parallel problems is to use the collection functions such as map and iter available in the Array.Parallel module. You don’t have to add any references, download any NuGet packages, or even open any namespaces to use Array.Parallel.

Let’s say you want to compute the MD5 hashes of every file in a folder. Computing such hashes can be quite expensive, particularly if the files are large. Here’s how you’d do it without using parallel processing:

open System.IO
open System.Security.Cryptography


let Hashes path =
   Directory.EnumerateFiles(path)    
   |> Array.ofSeq      
   |> Array.map (fun name ->
      use md5 = MD5.Create()
      use stream = File.OpenRead(name)
      let hash = md5.ComputeHash(stream)
      path, hash)

Note incidentally that Directory.Enumerate returns an IEnumerable or “sequence,” so you have to use Array.ofSeq to translate this into an array.

This works; on my machine, it took 11 seconds of real time to run on the contents of the c: emp folder.

You can parallelize this simply by adding the word “Parallel” to the Array.map call, like so:

open System.IO
open System.Security.Cryptography


let Hashes path =
   Directory.EnumerateFiles(path)
   |> Array.ofSeq
   |> Array.Parallel.map (fun name ->
      use md5 = MD5.Create()
      use stream = File.OpenRead(name)
      let hash = md5.ComputeHash(stream)
      path, hash)

On my machine, this runs in 6 seconds of real time.

Using Array.Parallel can be an incredibly easy shortcut to performance gains in your code. However, there are two gotchas to be aware of. Firstly, you will only see gains if the computation you are doing is relatively heavy in relation to the overhead of .NET splitting up the dataset for you, and assigning it for processing to multiple threads. You can really only determine the cost/benefit by experimentation. Secondly, you need to ensure that any resources that will be accessed by multiple threads are themselves thread safe. You can observe the importance of thread safety by moving the creation of the md5 instance outside the mapping operation, thus

let Hashes path =
   use md5 = MD5.Create()
   Directory.EnumerateFiles(path)
   |> Array.ofSeq
   |> Array.Parallel.map (fun name ->
      use stream = File.OpenRead(name)
      let hash = md5.ComputeHash(stream)
      path, hash)

Run this and you will immediately get

System.AggregateException: One or more errors occurred. ---> System.Security.Cryptography.CryptographicException: Hash not valid for use in specified state.

Clearly ComputeHash() is not thread safe, so you need to create one MD5 instance per hash computation.

The FSharp.Collections.ParallelSeq Module

The previous examples focused on the built-in Array.Parallel module, and got around the fact that System.IO.EnumerateFiles returns an IEnumerable by converting it to an array first. However, you can skip this conversion stage by installing the NuGet package FSharp.Collections.ParallelSeq. To install the package, go into the NuGet package manager console and use Install-Package:

PM> Install-Package FSharp.Collections.ParallelSeq

Now you can process the files “natively” as a sequence:

open System.IO
open System.Security.Cryptography
open FSharp.Collections.ParallelSeq


let Hashes3 path =
   Directory.EnumerateFiles(path)
   |> PSeq.map (fun name ->
      use md5 = MD5.Create()
      use stream = File.OpenRead(name)
      let hash = md5.ComputeHash(stream)
      path, hash)

Again, this took about 6 seconds on my machine, versus 12 seconds for a non-parallel version.

Asynchronous Programming

Asynchronous programming is slightly different from the other forms of parallel programming you’ve seen so far. The other topics covered allow a number of threads to execute work in parallel, taking advantage of all available processors in a system. In asynchronous programming, you want to avoid blocking threads. You’re already familiar with the concept of blocked threads from this chapter’s first section, “Threads, Memory, Locking, and Blocking.” A blocked thread is one that can do no work because it is waiting for some task to finish; commonly the task a thread is waiting for is the operating system performing I/O, but sometimes it might also be waiting for a lock so it can enter a critical section. Threads are relatively expensive resources; each thread is allocated a 1MB stack by default, and there are other expenses concerning how the operating system kernel handles a large number of threads. In performance-critical code, it’s important to keep the number of blocked threads low. Ideally, you will only have as many threads as you have processors, and you will have no blocked threads.

Note

For an overview of the kind of results that you can achieve using these techniques, see Amanda Laucher’s 2009 QCon talk in which she describes using F# asynchronous workflows to parallelize a C# program and achieves some impressive results: www.infoq.com/presentations/Concurrent-Programming-with-Microsoft-F-Amanda-Laucher .

In this section, you will look at how to use the .NET Framework’s asynchronous programming model to avoid blocking threads during I/O. The asynchronous programming model means using the pairs of Begin/End, such as the BeginRead/EndRead, on the Stream class. Typically you use these pairs of methods to perform some kind of I/O task, such as reading from a file. This method of programming has acquired a reputation for being difficult, mainly because you need to find a good way to store state between the Begin/End calls. This section will not cover the asynchronous programming model directly; instead, you’ll look at how to use a feature of F# called asynchronous workflows to avoid some of the work associated with asynchronous programming model in other .NET languages.

Asynchronous workflows are not exclusively for use with the .NET asynchronous programming model. In the next section, “Message Passing,” you’ll see how to use these workflows with F#’s mailboxes to coordinate a number of different tasks. This will allow you to wait for tasks to complete without blocking threads.

The first step in understanding asynchronous workflows in F# is to understand the syntax itself. To create an asynchronous workflow, you use a computation expression, similar to the sequence expressions you saw in Chapter 3. The basic syntax is the keyword asyncwith the workflow expression surrounded by curly brackets: async { ... }. A simple workflow program that uses workflows looks like this:

open System.IO

// a function to read a text file asynchronusly
let readFile file =
    async { let! stream = File.AsyncOpenText(file)
            let! fileContents = stream.AsyncReadToEnd()
            return fileContents }


// create an instance of the workflow
let readFileWorkflow = readFile "mytextfile.txt"


// invoke the workflow and get the contents
let fileContents = Async.RunSynchronously readFileWorkflow

To compile this program, you need to use NuGet to install the package named FSPowerPack.Community. This program shows a function named readFile that creates a workflow that reads a file asynchronously and then returns its contents. Next, you create an instance of the workflow called readFileWorkflow, and finally, you execute the workflow to get the file’s contents. It’s important to understand that simply calling the readFile function won’t actually read the file. Instead, it creates a new instance of the workflow, and you can then execute the workflow to perform the task of reading the file. The Async.RunSynchronously function is actually responsible for executing the workflow. A workflow instance is a small data structure, rather like a small program, that can be interpreted to do some work.

The most important thing to notice about this example is the let followed by an exclamation mark (let!), often pronounced let bang. The workflow or “computation expression” syntax allows library writers to give different meanings to let!. In the case of asynchronous workflows, it means that an asynchronous operation will take place, during which the workflow itself will stop executing. A callback will be placed in the thread pool, and it will be invoked when the operation has completed, possibly on a different thread if the thread making the original call is not free. After the async call, the original thread is free to carry on doing other work.

You’ve probably also noticed that the let! is used with some special methods prefixed with Async. These functions are defined as type augmentations, which are F#’s equivalent of C#’s extension methods, in FSharp.PowerPack.dll. These methods handle the calling of the Begin/End method pairs. If no Async method is available, it’s fairly easy to create your own using the Async.Primitive function and the Begin/End method pairs.

The execution flow of your simple example would follow these steps.

  • Step 1: The main program thread starts the process of opening the file stream, and a callback is placed in the thread pool that can be used when this completes. This thread is now free to continue doing other work.

  • Step 2: A thread pool thread will activate when the file stream has opened. It will then start reading the contents of the file and place a callback in the thread pool that can be used when this completes. Because it is a thread pool thread, it will return to the thread pool.

  • Step 3: A thread pool thread will activate when the file has been completely read. It will return the text data that has been read from the file and return to the thread pool.

  • Step 4: Because you used the Async.RunSynchronously function, the main program thread is waiting for the results of the workflow. It will receive the file contents.

You have probably spotted the flaw in this simple example. You do not block the main program thread waiting for I/O, but as you wait for the asynchronous workflow to complete, you do block the main program thread until the I/O has completed. To put this another way, there’s little or no advantage to executing one asynchronous workflow on its own and waiting for the result. However, it’s fairly simple to execute several workflows in parallel. Executing several workflows at once does have a distinct advantage because the original thread is not blocked after it starts executing the first Async task; this means it is free to go on and start executing more asynchronous tasks.

It’s fairly easy to illustrate this using a slightly modified version of the original example where, instead of reading one file, you read three of them. Let’s compare this to a synchronous version of the program, which will help demonstrate the differences. First, take a look at the synchronous version:

open System
open System.IO
open System.Threading


let print s =
    let tid = Thread.CurrentThread.ManagedThreadId
    Console.WriteLine(sprintf "Thread %i: %s" tid s)


let readFileSync file =
    print (sprintf "Beginning file %s" file)
    let stream = File.OpenText(file)
    let fileContents = stream.ReadToEnd()
    print (sprintf "Ending file %s" file)
    fileContents


// invoke the workflow and get the contents
let filesContents =
    [| readFileSync "text1.txt";
       readFileSync "text2.txt";
       readFileSync "text3.txt"; |]

This program is fairly straightforward. Note that the preceding code includes some debugging code to show when you begin and end processing a file. Now look at the asynchronous version:

open System
open System.IO
open System.Threading


let print s =
    let tid = Thread.CurrentThread.ManagedThreadId
    Console.WriteLine(sprintf "Thread %i: %s" tid s)


// a function to read a text file asynchronusly
let readFileAsync file =
    async { do print (sprintf "Beginning file %s" file)
            let! stream = File.AsyncOpenText(file)
            let! fileContents = stream.AsyncReadToEnd()
            do print (sprintf "Ending file %s" file)
            return fileContents }


let filesContents =
    Async.RunSynchronously
        (Async.Parallel [ readFileAsync "text1.txt";
                          readFileAsync "text2.txt";
                          readFileAsync "text3.txt"; ])

Again, this version incorporates some debugging code so you can see how the program executes. The biggest change is that you now use the Async.Parallel function to compose several workflows into a single workflow. This means that when the first thread finishes processing the first asynchronous call, it will be free to carry on processing the other workflows. This is probably easiest to see when you look at the results of the two programs:

Synchronous results

Thread 1: Beginning file text1.txt
Thread 1: Ending    file text1.txt
Thread 1: Beginning file text2.txt
Thread 1: Ending    file text2.txt
Thread 1: Beginning file text3.txt
Thread 1: Ending    file text3.txt

Asynchronous results

Thread 3: Beginning file text1.txt
Thread 4: Beginning file text2.txt
Thread 3: Beginning file text3.txt
Thread 4: Ending    file text2.txt
Thread 4: Ending    file text1.txt
Thread 4: Ending    file text3.txt

The two sets of results look quite different. For synchronous results, you see that each Beginning file is followed by an Ending file, and they all occur on the same thread. In the second case, you can see that all instances of the Beginning file occur at once, on two different threads. This occurs because once the first thread comes to an asynchronous operation, it is free to carry on and start another operation. The ending files occur later, once the I/O has completed.

Me ssage Passing

It’s often useful to think of a parallel program as a series of independent components that send and receive messages. This is often referred to as the actor model; you can find a more formal description of the actor model on Wikipedia at http://en.wikipedia.org/wiki/Actor_model . Although the scenarios in which you would use message passing tend to be quite complex, the ideas behind it are relatively simple, as you’ll see in a handful of straightforward examples.

The basic idea behind message passing is that a system is composed of agents, or actors . They can both send and receive messages. When an agent receives a message, the message is placed in a queue until the agent is ready to process it. When an agent processes a message, it makes a decision about what to do with it based on its internal state and the contents of the message. The agent has a number of possibilities open to it in response to an incoming message: it might send a reply to the agent that initiated the exchange, create a new message for a different agent, create a new agent, or perhaps update some internal data structure.

F# provides the generic MailboxProcessor class as its implementation of message passing and the actor model. When a MailboxProcessor is created, it has (as the name suggests) a message queue that it can use to receive messages. MailboxProcessor is responsible for deciding what it will do with the message once it receives it. The implementation of a MailboxProcessor tends to follow a few simple patterns. The following example illustrates the simplest pattern for a MailboxProcessor:

open System

let mailbox =
    MailboxProcessor.Start(fun mb ->
        let rec loop x =
            async { let! msg = mb.Receive()
                    let x = x + msg
                    printfn "Running total: %i - new value %i" x msg
                    return! loop x }
        loop 0)


mailbox.Post(1)
mailbox.Post(2)
mailbox.Post(3)


Console.ReadLine() |> ignore

Executing this code produces the following results:

Running total: 1 - new value 1
Running total: 3 - new value 2
Running total: 6 - new value 3

In the first part of the example, you create a mailbox that receives messages of type int. When the mailbox receives a message, it adds it to a running total and then displays the running total, along with the value received. Let’s take a closer look at how you achieve this. The MailboxProcessor has a static start method that receives a function as a parameter. The function the start method receives has an instance of the new MailboxProcessor, and it must return an asynchronous workflow. You should use the asynchronous workflow to read messages from the queue. You make it an asynchronous workflow because messages need to be read asynchronously; this ensures that a mailbox is not tied to a single thread, which would cause scalability issues if you were using lots of mailboxes. You need to keep checking the queue for new messages that arrive; typically, you do this by using an infinite loop to keep checking the queue. In this case, you define a recursive function called loop, which reads from the queue by calling the Receive function, processes the message, and then calls itself to start the process again. This is an infinite recursion, but there’s no danger of the stack overflowing because the function is tail recursive. The loop function takes a single parameter, which you use to store the mailbox’s state: an integer that represents the running total, in this case.

It’s also worth noting that Console.ReadLine()at the end of this example is important. This is because the message queue is processed in a separate thread. Once you finish posting messages to the mailbox using the Post method, the main thread has no more work to do, so it exits, causing the process to exit. In this case, the process will probably exit before the mailbox has had chance to process the messages in its queue. Calling Console.ReadLine() provides a simple way to block the main thread until the user has had chance to see the results of the mailbox processing the messages.

One final detail about this example: the mailbox’s Post member function is safe to call from any thread because of the mailbox’s work queue that ensures each message is processed in turn in an atomic way. The current example does not take advantage of this, but you will see this used in the next two examples.

This particular asynchronous workflow isn’t that useful; however, it does represent the simplest usage pattern of workflow: receive a message, update some internal state, and then react to the message. In this case, reacting to the message means writing to the console, which probably is too simplistic to be of much use. However, you can find more realistic scenarios for this usage pattern. A good example of this is using a mailbox to gather up a number of values, and then marshal to the GUI thread so the values can be viewed. You’ll learn more about this technique in the next pair of examples.

Begin by looking at the problem you’re trying to solve in a bit more detail. If you have a simulation that generates data, you might want to be able to see this data in real time, as it is generated. When working with GUIs, you face two related constraints that make this quite challenging. First, the GUI must run on its own thread, and this thread must not be occupied for a long time or the GUI will become unresponsive. This makes it impossible to execute a long-running simulation on the GUI thread. Second, you can only access GUI objects from the thread that created them: the GUI thread. If your simulation is running on anther thread, then it cannot write directly to the GUI. Fortunately, GUI objects provide an Invoke method that allows you to invoke a function on the GUI thread and safely update the GUI with the generated data. Calling the invoke function too often can have a negative impact on performance because marshalling data to the GUI thread is fairly expensive. If your simulation outputs a small amount of data frequently, it’s often a good idea to batch up the results, so you can print them to the screen 12 to 20 times a second to get a smooth animation effect. You’ll begin by learning how to use mailboxes to solve a specific instance of this problem; next, you’ll see a second example where you tidy this up into a more generic example.

F#’s mailboxes can help here by providing an elegant way to buffer the data before you print it to the screen. The basics of the algorithm are fairly simple. The thread running the simulation posts messages to the mailbox; when the mailbox has received enough messages, it notifies the GUI of the new updates to be drawn. This programming style also provides a neat way of separating the logic for generating the data from the logic presenting the data in the UI. Let’s have a look at the whole code example, and then step through and examine the how it all works. As with the Fibonacci example above, this code has been arranged so that you can run it by selecting all the code and sending to F# Interactive .

#if INTERACTIVE
#r "System.Windows.Forms.dll"
#r "System.Drawing.dll"
#else
module MailboxProcessorSimulation
#endif


open System
open System.Threading
open System.Windows.Forms
open System.Drawing.Imaging
open System.Drawing
// the width & height for the simulation
let width, height = 500, 600


// the bitmap that will hold the output data
let bitmap = new Bitmap(width, height, PixelFormat.Format24bppRgb)


// a form to display the bitmap
let form = new Form(Width = width, Height = height,
                    BackgroundImage = bitmap)


// the function which recieves that points to be plotted
// and marshals to the GUI thread to plot them
let printPoints points =
    form.Invoke(new Action(fun () ->
        List.iter bitmap.SetPixel points
        form.Invalidate()))
    |> ignore


// the mailbox that will be used to collect the data
let mailbox =
    MailboxProcessor.Start(fun mb ->
        // main loop to read from the message queue
        // the parameter "points" holds the working data
        let rec loop points =
            async { // read a message
                    let! msg = mb.Receive()
                    // if we have over 100 messages write
                    // message to the GUI
                    if List.length points > 100 then
                        printPoints points
                        return! loop []
                    // otherwise append message and loop
                    return! loop (msg :: points) }
        loop [])


// start a worker thread running our fake simulation
let startWorkerThread() =
    // function that loops infinitely generating random
    // "simulation" data
    let fakeSimulation() =
        let rand = new Random()
        let colors = [| Color.Red; Color.Green; Color.Blue |]
        while true do
            // post the random data to the mailbox
            // then sleep to simulate work being done
            mailbox.Post(rand.Next(width),
                rand.Next(height),
                colors.[rand.Next(colors.Length)])
            Thread.Sleep(rand.Next(100))
    // start the thread as a background thread, so it won't stop
    // the program exiting
    let thread = new Thread(fakeSimulation, IsBackground = true)
    thread.Start()


// start 6 instances of our simulation
for _ in 0 .. 5 do startWorkerThread()


// run the form
#if INTERACTIVE
form.ShowDialog() |> ignore
#else
Application.Run()
#endif

This example has three key parts: how the simulation posts data to the mailbox, how the mailbox buffers points to be sent to the GUI, and how the GUI receives the points. Let’s examine each of these in turn. Posting data to the mailbox remains simple; you continue to call the Post method on the mailbox. Two important differences exist between this example and the previous one. First, you pass a different data structure; however, the Post method is generic, so you remain strongly typed. Second, you call the Post method from six different threads. The message queue enables this to work just fine, so everything just works. You use a simple technique to buffer data, which means you can simply count the number of messages received. When you receive 100, you send them to the GUI.

async { // read a message
        let! msg = mb.Receive()
        // if we have over 100 messages write
        // message to the GUI
        if List.length points > 100 then
            printPoints points
            return! loop []
        // otherwise append message and loop
        return! loop (msg :: points) }

The number 100 is fairly arbitrary; it was chosen because it seemed to work well for this particular simulation. It’s also worth noting that you count the number of messages you receive at each iteration by calling the List.length function. This is suboptimal from a performance point of view because the List.length function will traverse the list each time you call it. This won’t matter much in the current example because it uses a fairly small list; however, if you increase the buffer size, this approach could become a bottleneck. A better approach is to store a separate parameter that you increment during each iteration of the function; however, this example avoids doing that for the sake of maintaining simplicity. Another alternative is to store the time of the previous update, updating again only if the previous update was more than a twentieth of a second ago. This approach works well because it allows you to aim for the correct number of frames per second required to achieve a smooth animation effect. Again, this book’s examples don’t rely on this approach because adopting it would add an unnecessary element of complexity to the examples. The example includes one more technique worth mentioning, which is how you write the data to the screen:

let printPoints points =
    form.Invoke(new Action(fun () ->
        List.iter bitmap.SetPixel points
        form.Invalidate()))
    |> ignore

This is fairly straightforward. The printPoints function takes a points parameter, and then invokes a delegate in the context of the form and allows you to write the points to the bitmap. Finally, you need to call the form’s Invalidate function to ensure the points are displayed correctly.

The previous example provides a nice demonstration of how to use mailboxes, but the main problem with it is that the code is not reusable. It would be better if you could wrap your mailbox into a reusable component. F#’s object-oriented features provide a great way of doing this. This following example also demonstrates a couple of other important concepts, such as how you can support messages of different types within the same mailbox, as well as how you can return messages to a client of the mailbox. Again, you can send all this code to F# Interactive for execution.

#if INTERACTIVE
#r "System.Windows.Forms.dll"
#r "System.Drawing.dll"
#else
module MailboxProcessorSimulationGeneric
#endif


open System
open System.Threading
open System.ComponentModel
open System.Windows.Forms
open System.Drawing.Imaging
open System.Drawing


// type that defines the messages types our updater can handle
type Updates<'a> =
    | AddValue of 'a
    | GetValues of AsyncReplyChannel<list<'a>>
    | Stop


// a generic collecter that receives a number of post items and
// once a configurable limit is reached fires the update event
type Collector<'a>(?updatesCount) =
    // the number of updates to count to before firing the update even
    let updatesCount = match updatesCount with Some x -> x | None -> 100


    // Capture the synchronization context of the thread that creates this object. This
    // allows us to send messages back to the GUI thread painlessly.
    let context = AsyncOperationManager.SynchronizationContext
    let runInGuiContext f =
        context.Post(new SendOrPostCallback(fun _ -> f()), null)


    // This events are fired in the synchronization context of the GUI (i.e. the thread
    // that created this object)
    let event = new Event<list<'a>>()


    let mailboxWorkflow (inbox: MailboxProcessor<_>) =
        // main loop to read from the message queue
        // the parameter "curr" holds the working data
        // the parameter "master" holds all values received
        let rec loop curr master =
            async { // read a message
                    let! msg = inbox.Receive()
                    match msg with
                    | AddValue x ->
                        let curr, master = x :: curr, x :: master
                        // if we have over 100 messages write
                        // message to the GUI
                        if List.length curr > updatesCount then
                            do runInGuiContext(fun () -> event.Trigger(curr))
                            return! loop [] master
                        return! loop curr master
                    | GetValues channel ->
                        // send all data received back
                        channel.Reply master
                        return! loop curr master
                    | Stop -> () } // stop by not calling "loop"
        loop [] []


    // the mailbox that will be used to collect the data
    let mailbox = new MailboxProcessor<Updates<'a>>(mailboxWorkflow)


    // the API of the collector

    // add a value to the queue
    member w.AddValue (x) = mailbox.Post(AddValue(x))
    // get all the values the mailbox stores
    member w.GetValues() = mailbox.PostAndReply(fun x -> GetValues x)
    // publish the updates event
    [<CLIEvent>]
    member w.Updates = event.Publish
    // start the collector
    member w.Start() = mailbox.Start()
    // stop the collector
    member w.Stop() = mailbox.Post(Stop)


// create a new instance of the collector
let collector = new Collector<int*int*Color>()


// the width & height for the simulation
let width, height = 500, 600


// a form to display the updates
let form =
    // the bitmap that will hold the output data
    let bitmap = new Bitmap(width, height, PixelFormat.Format24bppRgb)
    let form = new Form(Width = width, Height = height, BackgroundImage = bitmap)
    // handle the collector's update event and use it to post
    collector.Updates.Add(fun points ->
        List.iter bitmap.SetPixel points
        form.Invalidate())
    // start the collector when the form loads
    form.Load.Add(fun _ -> collector.Start())
    // when the form closes get all the values that were processed
    form.Closed.Add(fun _ ->
        let vals = collector.GetValues()
        MessageBox.Show(sprintf "Values processed: %i" (List.length vals))
        |> ignore
        collector.Stop())
    form


// start a worker thread running our fake simulation
let startWorkerThread() =
    // function that loops infinitely generating random
    // "simulation" data
    let fakeSimulation() =
        let rand = new Random()
        let colors = [| Color.Red; Color.Green; Color.Blue |]
        while true do
            // post the random data to the collector
            // then sleep to simulate work being done
            collector.AddValue(rand.Next(width),
                rand.Next(height),
                colors.[rand.Next(colors.Length)])
            Thread.Sleep(rand.Next(100))
    // start the thread as a background thread, so it won't stop
    // the program exiting
    let thread = new Thread(fakeSimulation, IsBackground = true)
    thread.Start()


// start 6 instances of our simulation
for _ in 0 .. 5 do startWorkerThread()


// run the form
#if INTERACTIVE
form.ShowDialog() |> ignore
#else
Application.Run()
#endif

The output of this example is exactly the same as that of the previous example, and the code base follows largely the same pattern; however, you can see several important differences in the two examples. Perhaps the most noticeable one is that the mailbox is now wrapped in an object that provides a strongly typed interface. The class you have created is called a Collector<'a>; its interface looks like this:

type Collector<'a> =
  class
    new : ?updatesCount:int -> Collector<'a>
    member AddValue : x:'a -> unit
    member GetValues : unit -> 'a list
    member Start : unit -> unit
    member Stop : unit -> unit
    member Updates : IEvent<'a list>
  end

The class is generic in terms of the type of values that it collects. It has an AddValue method to post a value to the internal mailbox and a GetValues method to get all the messages that have been passed to the mailbox so far. The collector must now be explicitly started and stopped by its Start and Stop methods. Finally, the collector has an Update event that is raised when enough messages have been collected. The number of messages collected is configurable by an optional integer that you can pass to the class constructor. The fact that you use an event is an important design detail. Using an event to notify clients that updates exist means that your Collector<'a>needs no knowledge of the clients it uses, which greatly improves its reusability.

You now use a union type to represent your messages; this gives you the flexibility to have different types of messages. Clients of the Collector<'a> don’t deal with it directly, but instead use the member methods it provides. The member methods have the job of creating the different types of messages. In addition to providing a value to the message queue, you can also send a message to retrieve all the current messages, as well as a message to stop the mailbox from reading new messages.

type Updates<'a> =
    | AddValue of 'a
    | GetValues of AsyncReplyChannel<list<'a>>
    | Stop

Next, you implement these different types of messages by pattern matching over the received messages.

                    let! msg = inbox.Receive()
                    match msg with
                    | AddValue x ->
                        let curr, master = x :: curr, x :: master
                        // if we have over 100 messages write
                        // message to the GUI
                        if List.length curr > updatesCount then
                            do runInGuiCtxt(fun () -> fireUpdates(curr))
                            return! loop [] master
                        return! loop curr master
                    | GetValues channel ->
                        // send all data received back
                        channel.Reply master
                        return! loop curr master
                    | Stop -> ()

The AddValue union case is basically what you did in the previous example, except that this time you add the values to both the curr and master lists. The curr list stores the current values you will pass to the GUI on the next update, while the mast list provides a list of all the values that you’ve received. The master list enables you to accommodate any client that requests all the values.

For the union case GetValues, it’s worth spending some time looking at how a client can return values. You start this process by calling the mailbox’s PostAndReply method rather than its Post method; you can see this at work in the GetValues member method implementation :

// get all the values the mailbox stores
member w.GetValues() = mailbox.PostAndReply(fun x -> GetValues x)

The PostAndReply method accepts a function that is passed an AsyncReplyChannel<'a> type. You can use this AsyncReplyChannel<'a> type to send a message back to the call via its Reply member. This is what you see in the GetValues case of your union. Users of this method should be careful because it blocks until the message is returned, which means the message won’t be processed until it reaches the front of the queue. This can take a long time if you have a long queue. In practice, you should use the AsyncPostAndReply approach because it enables you to avoid blocking a thread while waiting for the reply; however, this example doesn’t do this for the sake of keeping the example simple.

The Stop union case is the simplest way to stop reading messages from the queue; all you need to do is avoid calling the loop method recursively. That’s not an issue in this case, but you still need to return a value, which you do by returning the unit type, which is represented by empty parentheses, (). The only subtlety you need to be careful of here is that calling the Stop method will not stop the mailbox immediately; it will stop the mailbox only when the stop message reaches the front of the queue.

You’ve seen how the Collector<'a> type handles messages; now let’s look at how the Collector<'a> raises the Update event so that it runs on the GUI thread. You create the Update event using new Event, just as you create any other event in F#. You use the function runInGuiContextto make this event run in the context of the GUI:

let context = AsyncOperationManager.SynchronizationContext
let runInGuiContext f =
    context.Post(new SendOrPostCallback(fun _ -> f()), null)

First, you store the SynchronizationContext of the thread that created the object. You do this by using a static property on the AsyncOperationManager available in the System.ComponentModel namespace. The SynchronizationContext enables you to marshal to the thread that created it using its Post member method. The only thing you need to be careful about is that the thread that creates the collector object becomes the GUI thread; however, typically you’ll use the main program thread to do both things, so this won’t be a problem. This technique where you capture the synchronization context is also used in the BackgroundWorker class from the “Reactive Programming” section of this chapter.

The definition of the form is now somewhat simpler because you no longer need to provide a function for the mailbox to call. You simply handle the Updates event instead:

// handle the collector's update event and use it to post
collector.Updates.Add(fun points ->
    List.iter bitmap.SetPixel points
    form.Invalidate())

You can also now take advantage of the form’s Closed event to stop the mailbox processor and obtain a list of all the messages processed when a user closes the form:

// when the form closes get all the values that were processed
form.Closed.Add(fun _ ->
    let vals = collector.GetValues()
    MessageBox.Show(sprintf "Values processed: %i" (List.length vals))
    |> ignore
    collector.Stop())

You haven’t changed the behavior of your example, but these additions greatly improved the design of the code by decoupling the code for the mailbox from the GUI code, which improves the reusability of the Collector<'a> class tremendously.

Summary

This chapter covered quite a lot of ground. You saw five different concurrency techniques, all of which have their place in certain kinds of applications.

In the next chapter, you’ll see how some of these techniques, especially asynchronous workflows, can be used to make programming distributed applications easier.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.32.222