Chapter 15. Parallel Programming

With version 4.0 of the .NET Framework, Microsoft introduced a new model for writing applications that need to perform multiple simultaneous tasks—that model is known as parallel programming, and the implementation is called the Task Parallel Library. Unlike the traditional approach to multitasking, where you create and manage a set of threads in your code, the new parallel programming model lets you focus on the tasks you need to accomplish and allows the runtime to create and manage the threads on your behalf.

There key advantage of this approach is that your code is focused on the tasks you need to perform, not the way in which they will be performed. The main disadvantage is that you give up direct control of the behavior of your application—so, for many applications, the new parallel programming model will be ideal, but for those applications that require careful control and management (and for those programmers who cannot let go), we refer you to Chapter 4, which covers the traditional threading approach. The recipes in this chapter describe how to perform the following tasks:

Perform Simple Parallel Tasks

Problem

You need to perform simple tasks simultaneously.

Solution

Use the Invoke method of the System.Threading.Parallel class, passing in an instance of the System.Action delegate for each method you wish to run.

How It Works

The Invoke method of the Parallel class is the simplest way to add multitasking to your application. You simply provide a set of Action delegates, each of which wraps around a method you wish to invoke. The .NET Framework takes care of the rest—threads are created and managed automatically on your behalf.

Note

The Parallel.Invoke method can only be used to invoke methods that do not return a result. See the other recipes in this chapter for more complex examples.

The Code

The following example invokes three methods concurrently, each of which writes a series of messages to the console. In order to simulate a time-intensive task, these methods call Thread.Sleep to slow down the progress of the application—something that you would not do with a real application.

We have created the Action delegates explicitly to make the example as clear as possible, but a more elegant approach is to use lambda expressions, so that

Parallel.Invoke(
                new Action(writeDays),
                new Action(writeMonths),
                new Action(writeCities)
            );

would be written as

Parallel.Invoke(
                () => writeDays(),
                () => writeMonths(),
                () => writeCities()
            );

The remaining recipes in this chapter use lambda expressions.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Recipe15_01
{
    class Recipe15_01
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Press enter to start");
            Console.ReadLine();
// Invoke the methods we want to run.
            Parallel.Invoke(
                new Action(writeDays),
                new Action(writeMonths),
                new Action(writeCities)
            );

            // Wait to continue.
            Console.WriteLine("
Main method complete. Press Enter");
            Console.ReadLine();
        }

       static void writeDays()
        {
            string[] daysArray = { "Monday", "Tuesday", "Wednesday",
                "Thursday", "Friday", "Saturday", "Sunday" };
            foreach (string day in daysArray)
            {
                Console.WriteLine("Day of the Week: {0}", day);
                Thread.Sleep(500);
            }
        }

        static void writeMonths()
        {
            string[] monthsArray = { "Jan", "Feb", "Mar", "Apr",
                                       "May", "Jun", "Jul",
                                       "Aug", "Sep", "Oct", "Nov", "Dec" };
            foreach (string month in monthsArray)
            {
                Console.WriteLine("Month: {0}", month);
                Thread.Sleep(500);
            }
        }

        static void writeCities()
        {
            string[] citiesArray = { "London", "New York", "Paris", "Tokyo",
                "Sydney" };
            foreach (string city in citiesArray)
            {
                Console.WriteLine("City: {0}", city);
                Thread.Sleep(500);
            }

        }
    }
}

Return a Result from a Task

Problem

You need to perform concurrent tasks that return results.

Solution

Create typed instances of the Task class by passing function delegates to the generic-typed static System.Threading.Task<>.Factory.StartNew method. Use the Task.Result property to obtain the result from your task.

How It Works

For anything other than simple tasks, such as those in the previous recipe, you use the Task class to write parallel applications. New tasks are created (and automatically started) when you call the Task<>.Factory.StartNew method, passing in a function delegate as the argument. You obtain the result of your task through the Task.Result property.

Tip

The StartNew method creates and starts a new task in one step. If you need to create tasks and start them later, you can create instances of Task directly with the class constructors and start them running using the Start method.

The Code

The following example modifies the task methods from the previous recipe to return how many items have been printed out. We call the Result property for each task and write it to the console. Notice that when running the example, the results are intermingled with the output from the tasks themselves, as shown following:

. . .
Month: Jul
Day of the Week: Sunday
Month: Aug
7 days were written
Month: Sep
Month: Oct
Month: Nov
Month: Dec
12 months were written
5 cities were written
. . .

This happens because the Result property blocks until the task has completed. See the following recipes for different ways to wait for tasks to complete.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Recipe15_02
{
    class Recipe15_02
    {
        static void Main(string[] args)
        {

            Console.WriteLine("Press enter to start");
            Console.ReadLine();

            // Create the tasks.
            Task<int> task1 = Task<int>.Factory.StartNew(() => writeDays());
            Task<int> task2 = Task<int>.Factory.StartNew(() => writeMonths());
            Task<int> task3 = Task<int>.Factory.StartNew(() => writeCities());

            // Get the results and write them out.
            Console.WriteLine("{0} days were written", task1.Result);
            Console.WriteLine("{0} months were written", task2.Result);
            Console.WriteLine("{0} cities were written", task3.Result);

            // Wait to continue.
            Console.WriteLine("
Main method complete. Press Enter");
            Console.ReadLine();
        }

       static int writeDays()
        {
            string[] daysArray = { "Monday", "Tuesday", "Wednesday",
                                     "Thursday", "Friday",
                                     "Saturday", "Sunday" };
            foreach (string day in daysArray)
            {
                Console.WriteLine("Day of the Week: {0}", day);
                Thread.Sleep(500);
            }
            return daysArray.Length;
        }

        static int writeMonths()
        {
            string[] monthsArray = { "Jan", "Feb", "Mar", "Apr",
                                       "May", "Jun", "Jul",
                                       "Aug", "Sep", "Oct", "Nov", "Dec" };
foreach (string month in monthsArray)
            {
                Console.WriteLine("Month: {0}", month);
                Thread.Sleep(500);
            }
            return monthsArray.Length;
        }

        static int writeCities()
        {
            string[] citiesArray = { "London", "New York", "Paris",
                                     "Tokyo", "Sydney" };
            foreach (string city in citiesArray)
            {
                Console.WriteLine("City: {0}", city);
                Thread.Sleep(500);
            }
            return citiesArray.Length;
        }
    }
}

Wait for Tasks to Complete

Problem

You need to wait for one or more tasks to complete.

Solution

Use the Wait, WaitAll, or WaitAny methods of the System.Threading.Task class.

How It Works

The Wait method is called on a Task instance and blocks until the task is complete. The static WaitAll and WaitAny methods take an array of tasks as parameters—the WaitAll method blocks until all of the Tasks in the array have completed, and the WaitAny method blocks until any one of the Tasks is finished. These methods also accept an int argument that will block for the specific number of milliseconds and then continue regardless of whether the task or tasks have completed. The IsCompleted property of the Task class is used to determine whether a task has finished.

The Code

This example changes the code from the previous recipe to wait for all of the tasks we created using the WaitAll method. In the previous example, the results of the tasks were reported as each result we requested became available—this example waits for all of the tasks to complete before obtaining the results.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Recipe15_03
{
    class Recipe15_03
    {
        static void Main(string[] args)
        {

            Console.WriteLine("Press enter to start");
            Console.ReadLine();

            // Create the tasks.
            Task<int> task1 = Task<int>.Factory.StartNew(() => writeDays());
            Task<int> task2 = Task<int>.Factory.StartNew(() => writeMonths());
            Task<int> task3 = Task<int>.Factory.StartNew(() => writeCities());

            // Wait for all of the tasks to complete.
            Task.WaitAll(task1, task2, task3);

            // Get the results and write them out.
            Console.WriteLine("{0} days were written", task1.Result);
            Console.WriteLine("{0} months were written", task2.Result);
            Console.WriteLine("{0} cities were written", task3.Result);

            // Wait to continue.
            Console.WriteLine("
Main method complete. Press Enter");
            Console.ReadLine();
        }

        static int writeDays()
        {
            string[] daysArray = { "Monday", "Tuesday", "Wednesday",
                                     "Thursday", "Friday",
                                     "Saturday", "Sunday" };
            foreach (string day in daysArray)
            {
                Console.WriteLine("Day of the Week: {0}", day);
                Thread.Sleep(500);
            }
            return daysArray.Length;
        }
static int writeMonths()
        {
            string[] monthsArray = { "Jan", "Feb", "Mar", "Apr",
                                       "May", "Jun", "Jul",
                                       "Aug", "Sep", "Oct", "Nov", "Dec" };
            foreach (string month in monthsArray)
            {
                Console.WriteLine("Month: {0}", month);
                Thread.Sleep(500);
            }
            return monthsArray.Length;
        }

        static int writeCities()
        {
            string[] citiesArray = { "London", "New York", "Paris",
                                     "Tokyo", "Sydney" };
            foreach (string city in citiesArray)
            {
                Console.WriteLine("City: {0}", city);
                Thread.Sleep(500);
            }
            return citiesArray.Length;
        }
    }
}

Parallel Process a Collection

Problem

You need to parallel process each element in a collection.

Solution

Use the System.Threading.Parallel.ForEach method to create a new task to process each of the elements in a collection. Optionally, use System.Threading.ParallelOptions to limit the degree of parallelism that will be used.

How It Works

The static Parallel.ForEach method accepts a collection, a function delegate, and an optional instance of ParallelOptions as arguments. A new task is created to process each element in the collection using the function referenced by the delegate. The number of concurrent tasks is controlled by the ParallelOptions.MaxDegreeOfParallelism property—a value of −1 means that the degree of parallelism will be determined by the runtime, whereas a value of 1 or more limits the number of tasks that will run at the same time (a value of 0 will throw an exception).

The Code

The following example creates tasks to process each element of a simple array using the printNumbers method. We have called Thread.Sleep in this method to slow down the processing so that the example is clearer. We use the MaxDegreeOfParallelism property of ParallelOptions to ensure that at most two tasks are performed simultaneously—when running the example, notice that the output from the first two tasks is intermingled and then followed by the output from the third task.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Recipe15_04
{
    class Recipe15_04
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Press enter to start");
            Console.ReadLine();

            // Define the data we want to process.
            int[] numbersArray = { 100, 200, 300 };

            // Configure the options.
            ParallelOptions options = new ParallelOptions();
            options.MaxDegreeOfParallelism = 2;

            // Process each data element in parallel.
            Parallel.ForEach(numbersArray, options, baseNumber =>
                printNumbers(baseNumber));

            Console.WriteLine("Tasks Completed.  Press Enter");
            Console.ReadLine();
        }

        static void printNumbers(int baseNumber)
        {
            for (int i = baseNumber, j = baseNumber + 10; i < j; i++)
            {
                Console.WriteLine("Number: {0}", i);
                Thread.Sleep(100);
            }
        }
    }
}

Chain Tasks Together

Problem

You need to perform several tasks in sequence.

Solution

Create an instance of Task for the initial activity using the class constructors (as shown in the previous recipes in this chapter), and then call the ContinueWith method to create a Task instance representing the next activity in the sequence. When you have created all of the Task instances you require, call the Start method on the first in the sequence.

How It Works

The Task.ContinueWith and Task.ContinueWith<> methods create a new task that will continue upon completion of the Task instance on which they are invoked. The previous task (known as the antecedent) is provided as an input parameter to the lambda expression in the ContinueWith method—this can be used to check the states or get the result of the previous task, as shown in the following example.

The Code

The example for this recipe chains three tasks together. The first task adds some integer values. The second obtains the result from the first and prints it out, and the third task simply writes a message without reference to the previous tasks at all.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Recipe15_05
{
    class Recipe15_05
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Press enter to start");
            Console.ReadLine();

            // Create the set of tasks.
            Task<int> firstTask = new Task<int>(() => sumAndPrintNumbers(100));
            Task secondTask = firstTask.ContinueWith(parent => printTotal(parent));
            Task thirdTask = secondTask.ContinueWith(parent => printMessage());
// Start the first task.
            firstTask.Start();
            // Read a line to keep the process alive.
            Console.WriteLine("Press enter to finish");
            Console.ReadLine();
        }

        static int sumAndPrintNumbers(int baseNumber)
        {
            Console.WriteLine("sum&print called for {0}", baseNumber);
            int total = 0;
            for (int i = baseNumber, j = baseNumber + 10; i < j; i++)
            {
                Console.WriteLine("Number: {0}", i);
                total += i;
            }
            return total;
        }

        static void printTotal(Task<int> parentTask)
        {
            Console.WriteLine("Total is {0}", parentTask.Result);
        }

        static void printMessage()
        {
            Console.WriteLine("Message from third task");
        }
    }
}

Write a Cooperative Algorithm

Problem

You need to write a parallel algorithm with multiple phases, each of which must be completed before the next can begin.

Solution

Create an instance of the System.Threading.Barrier class and call the SignalAndWait method from your Task code at the end of each algorithm phase.

How It Works

The Barrier class allows you to wait for a set of tasks to complete one part of an algorithm before moving onto the next. This is useful when the overall results from the one phase are required by all tasks in order to complete a subsequent phase. When creating an instance of Barrier, you specify an integer as a constructor argument. In your Task code, you call the SignalAndWait method when you have reached the end of a phase—your Task will block until the specified number of Tasks is waiting, at which point the Barrier allows all of the waiting tasks to continue into the next phase. It is up to you to determine what constitutes each phase of your algorithm and to specify how many Tasks must reach the barrier before the next phase can begin.

You can also specify an action to be performed when each phase is completed (i.e., after the required number of tasks have called the SignalAndWait method, but before the tasks are allowed to continue to the next phase—the example for this recipe demonstrates how to do this with a lambda function.

Note

It is important to ensure that you set the Barrier instance to expect the correct number of tasks at each stage of your algorithm. If you tell the Barrier to expect too few tasks, one phase may not have completed before the next begins. If you tell the Barrier to expect too many tasks, a phase will never start, even though all of your tasks have completed the earlier phase. You can change the number of tasks a Barrier will wait for by using the AddParticipant, AddParticipants, RemoveParticipant, and RemoveParticipants methods.

The Code

The following example shows a simple two-phase cooperative algorithm, performed by three tasks. When all of the tasks reach the barrier at the end of each phase, the notifyPhaseEnd method is called.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Recipe15_06
{
    class Recipe15_06
    {
        static void Main(string[] args)
        {
            // Create the barrier.
            Barrier myBarrier = new Barrier(3,
                (barrier) => notifyPhaseEnd(barrier));

            Task task1 = Task.Factory.StartNew(
                () => cooperatingAlgorithm(1, myBarrier));
            Task task2 = Task.Factory.StartNew(
                () => cooperatingAlgorithm(2, myBarrier));
Task task3 = Task.Factory.StartNew(
                () => cooperatingAlgorithm(3, myBarrier));

            // Wait for all of the tasks to complete.
            Task.WaitAll(task1, task2, task3);

            // Wait to continue.
            Console.WriteLine("
Main method complete. Press Enter");
            Console.ReadLine();
        }

        static void cooperatingAlgorithm(int taskid, Barrier barrier)
        {
            Console.WriteLine("Running algorithm for task {0}", taskid);

            // Perform phase one and wait at the barrier.
            performPhase1(taskid);
            barrier.SignalAndWait();

            // Perform phase two and wait at the barrier.
            performPhase2(taskid);
            barrier.SignalAndWait();
        }

        static void performPhase1(int taskid)
        {
            Console.WriteLine("Phase one performed for task {0}", taskid);
        }

        static void performPhase2(int taskid)
        {
            Console.WriteLine("Phase two performed for task {0}", taskid);
        }

        static void notifyPhaseEnd(Barrier barrier)
        {
            Console.WriteLine("Phase has concluded");
        }
    }
}

Handle Exceptions in Tasks

Problem

You need to catch and process exceptions thrown by a Task.

Solution

Call the Task.Wait or Task.WaitAll methods within a try...catch block to catch the System.AggregateException exception. Call the Handle method of AggregateException with a function delegate—the delegate will receive each exception that has been thrown by the Tasks. Your function should return true if the exception can be handled, and false otherwise.

How It Works

Catching AggregateException as it is thrown from Task.Wait or Task.WaitAll allows you to be notified of exceptions that are unhandled by your Task. If an error has occurred, then you will catch a single instance of System.AggregateException representing all of the exceptions that have been thrown.

You process each individual exception by calling the AggregateException.Handle method, which accepts a function delegate (usually specified using a lambda expression)—the delegate will be called once for each exception that has been thrown by your task or tasks. Bear in mind that several threads may have encountered the same problem, and that you are likely to have to process the same exception type more than once. If you can handle the exception, your function delegate should return true—returning false will cause your application to terminate.

Tip

If you do not catch exceptions from Wait or WaitAll, then any exception thrown by a Task will be considered unhandled and terminate your application.

The Code

The following example demonstrates how use the AggregateException.Handle method to implement a custom exception handler function:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Recipe15_07
{
    class Recipe15_07
    {
        static void Main(string[] args)
        {
            // Create two tasks, one with a null param.
            Task goodTask = Task.Factory.StartNew(() => performTask("good"));
            Task badTask = Task.Factory.StartNew(() => performTask("bad"));
try
            {
                Task.WaitAll(goodTask, badTask);
            }
            catch (AggregateException aggex)
            {
                aggex.Handle(ex => handleException(ex));
            }

            // Wait to continue.
            Console.WriteLine("
Main method complete. Press Enter");
            Console.ReadLine();
        }

        static bool handleException(Exception exception)
        {
            Console.WriteLine("Processed Exception");
            Console.WriteLine(exception);
            // Return true to indicate we have handled the exception.
            return true;
        }

        static void performTask(string label)
        {
            if (label == "bad")
            {
                Console.WriteLine("About to throw exception.");
                throw new ArgumentOutOfRangeException("label");
            }
            else
            {
                Console.WriteLine("performTask for label: {0}", label);
            }
        }
    }
}

Cancel a Task

Problem

You need to cancel a Task while it is running.

Solution

Create an instance of System.Threading.CancellationTokenSource and call the Token property to obtain a System.Threading.CancellationToken. Pass a function delegate that calls the Cancel method of your Task to the Register method of CancellationToken. Cancel your Task by calling the Cancel method of CancellationTokenSource.

How It Works

The System.Threading.CancellationTokenSource class provides a mechanism to cancel one or more tasks. CancellationTokenSource is a factory for System.Threading.CancellationToken.

CancallationToken has the property IsCancellationRequested, which returns true when the Cancel method is called on the CancellationTokenSource that produced the token. You can also use the Register method to specify one or more functions to be called when the Cancel method is called. The sequence for handling cancellation is as follows:

  1. Create an instance of CancellationTokenSource.

  2. Create one or more Tasks to handle your work, passing CancellationToken as a constructor parameter.

  3. For each Task you have created, obtain a CancellationToken by calling Token on the CancellationTokenSource created in step 1.

  4. Check the IsCancellationRequested property of the token in your Task body—if the property returns true, then release any resources and throw an instance of OperationCanceledException.

  5. When you are ready to cancel, call the Cancel method on the CancellationTokenSource from step 1.

Note that you must throw an instance of OperationCanceledException to acknowledge the task cancellation request.

The Code

The following example creates a CancellationToken that is used to create an instance of Task. A method to be called when the CancellationTokenSource is canceled is registered with the Register method. When CancellationTokenSource.Cancel is called, the Task is stopped and a message is written to the console.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Recipe15_08
{
    class Recipe15_08
    {
        static void Main(string[] args)
        {
            // Create the token source.
            CancellationTokenSource tokenSource = new CancellationTokenSource();
            // create the cancellation token
            CancellationToken token = tokenSource.Token;
// Create the task.
            Task task = Task.Factory.StartNew(() => printNumbers(token), token);
            // register the task with the token
            token.Register(() => notifyTaskCanceled ());

            // Wait for the user to request cancellation.
            Console.WriteLine("Press enter to cancel token");
            Console.ReadLine();

            // Canceling.
            tokenSource.Cancel();
        }

        static void notifyTaskCanceled()
        {
            Console.WriteLine("Task cancellation requested");
        }

        static void printNumbers(CancellationToken token)
        {
            int i = 0;
            while (!token.IsCancellationRequested)
            {
                Console.WriteLine("Number {0}", i++);
                Thread.Sleep(500);
            }
            throw new OperationCanceledException(token);
        }
    }
}

Share Data Between Tasks

Problem

You need to share data safely between Tasks.

Solution

Use the collection classes in the System.Collections.Concurrent namespace.

How It Works

One of the biggest problems when writing parallel or threaded code is ensuring that data is shared safely. Microsoft has introduced new classes in .NET 4.0 that are designed to be more efficient than using synchronization around the default collection classes, which we demonstrated in Chapter 4. The techniques demonstrated in Chapter 4 will work with the .NET parallel programming model, but the new collection classes may be more efficient for large-scale applications. Table 15-1 lists the most useful classes from the System.Collections.Concurrent namespace.

Table 15.1. Useful System.Collections.Concurrent Classes

Class

Description

ConcurrentBag

A thread-safe collection of objects where no typing or ordering is assumed

ConcurrentDictionary

A key/value pair collection

ConcurrentQueue

A first in, first out (FIFO) queue

ConcurrentStack

A last in, first out (LIFO) stack

These new collections take care of managing data automatically—you do not have to use synchronization techniques in your code.

The Code

The following example creates a ConcurrentStack, which is then used by three Tasks.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;

namespace Recipe15_9
{
    class Recipe15_9
    {
        static void Main(string[] args)
        {
            // Create a concurrent collection.
            ConcurrentStack<int> cStack = new ConcurrentStack<int>();

            // create tasks that will use the stack
            Task task1 = Task.Factory.StartNew(
                () => addNumbersToCollection(cStack));
            Task task2 = Task.Factory.StartNew(
                () => addNumbersToCollection(cStack));
            Task task3 = Task.Factory.StartNew(
                () => addNumbersToCollection(cStack));
// Wait for all of the tasks to complete.
            Task.WaitAll(task1, task2, task3);

            // Report how many items there are in the stack.
            Console.WriteLine("There are {0} items in the collection",
                cStack.Count);

            // Wait to continue.
            Console.WriteLine("
Main method complete. Press Enter");
            Console.ReadLine();
        }

        static void addNumbersToCollection(ConcurrentStack<int> stack)
        {
            for (int i = 0; i < 1000; i++)
            {
                stack.Push(i);
            }
        }
    }
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.75.236