Task parallelism with TPL

As mentioned earlier, task parallelism happens when dealing with parallel invocations of multiple methods/function. Within the .NET Framework, this can be obtained with the invocation of the Parallel.Invoke method, which needs to have as a parameter all parallelizable actions as a whole. Most techniques applicable here are also applicable in asynchronous programming with the Task or the TaskFactory class. So reading Chapter 4, Asynchronous Programming is mandatory to get the best of task parallelism.

The Parallel.Invoke method simply takes multiple remote methods to call procedures in a parallel way by accepting a System.Action array. Here is an example:

static void Main(string[] args)
{
    //short form with named methods
    Parallel.Invoke(Method1, Method2, Method3);

    //short form with anonymous methods
    Parallel.Invoke(
        () => { },
        () => { },
        () => { });
}

static void Method1() { }
static void Method2() { }
static void Method3() { }

In the following code example, we will process a picture resize in two different resolutions using task parallelism:

static void Main(string[] args)
{
    //add reference to System.Drawing assembly

    //an original image file
    byte[] originalImageData = File.ReadAllBytes("picture.jpg");
    byte[] thumb300x200 = null;
    byte[] thumb150x100 = null;

    //resize picture to 300x200px and 150x100px for thumbprint needs
    Parallel.Invoke(
        new Action(() =>
        {
            thumb300x200 = ResizeImage(originalImageData, 300, 200);
        }),
        new Action(() =>
        {
            thumb150x100 = ResizeImage(originalImageData, 150, 100);
        })
        );

    //save the resized images
    File.WriteAllBytes("pricture-300.jpg", thumb300x200);
    File.WriteAllBytes("pricture-150.jpg", thumb150x100);
}

static byte[] ResizeImage(byte[] original, int newWidth, int newHeight)
{
    //creates a stream from a byte[]
    using (var sourceStream = new MemoryStream(original))
    //load a bitmap from source stream
    using (var originalBitmap = new Bitmap(sourceStream))
    //resize the original bitmap to a new size
    using (var resizedBitmap = new Bitmap(originalBitmap, newWidth, newHeight))
    //creates a new in-memory stream from resized image
    using (var targetStream = new MemoryStream())
    {
        //save resized image to the in-memory stream
        resizedBitmap.Save(targetStream, ImageFormat.Jpeg);
        //return a byte[] from the saved stream
        return targetStream.ToArray();
    }
}

The Parallel.Invoke method will do the most work for us by actually creating a task for each action we need to process; thus obtaining the parallelization needed.

As with any task creation by the TaskFactory class, here we have the ability to configure some task creation options such as the maximum concurrent task number, giving a CancellationToken, and so on:

Parallel.Invoke(new ParallelOptions
{
    MaxDegreeOfParallelism = 2,
},
    () => Method1(),
    () => Method2(),
    () => Method3()
    );

An important fact that we always have to deal with when working with parallel programming is that this result has no order. Because of parallelization, we cannot predict task execution time. We must simply wait for completion.

A similar result is available through the WaitAll behaviour:

Task.WaitAll(
    Task.Run(
    () => Method1()),
    Task.Run(
    () => Method2()),
    Task.Run(
    () => Method3())
    );

Although this choice adds the ability to handle timeout as we wish, it provides a similar result because it lacks in task-group configuration, as what was offered by the ParallelOptions class. A solution is to use a custom class extending the TaskFactory class, but this choice will add nothing more than using the Parallel.Invoke method.

Please note that the focus when dealing with task parallelism is that the framework handles lot of things by itself; first of all, the task's creation and destruction. Because of this, the WaitAll method is a bit outside of the theory of task parallelism; it's only related to multiple asynchronous programming.

An interesting usage scenario for task parallelism is in speculative execution. This happens when we execute some task before it is actually needed, or in a more general way, when we do not need it. A canonical example is what happens when we execute multiple searches against our data source (or web) with different parameters. Here, only the fastest tasks win, so all other slower tasks are canceled. Here is an example:

static void Main(string[] args)
{
    //a cancellation token source for cancellation signalling
    using (var ts = new CancellationTokenSource())
    //tasks that returns a value
    using (var task1 = Task.Factory.StartNew<int>(TaskWorker, ts.Token))
    using (var task2 = Task.Factory.StartNew<int>(TaskWorker, ts.Token))
    using (var task3 = Task.Factory.StartNew<int>(TaskWorker, ts.Token))
    {
        //a container for all tasks
        var tasks = new[] { task1, task2, task3 };
        //the index of the fastest task
        var i = Task.WaitAny(tasks);

        //lets cancel all remaining tasks
        ts.Cancel();

        Console.WriteLine("The fastest result is {0} from task index {1}", tasks[i].Result, i);

        //bring back to the starting thread all task exceptions
        try
        {
            Task.WaitAll(tasks);
        }
        catch (AggregateException ax)
        {
            //let's handle all inner exceptions automatically
            //if any not OperationCanceledException exist
            //those will be raised again
            ax.Handle(ex => ex is OperationCanceledException);
        }
    }
    Console.ReadLine();
}

private static readonly Random random = new Random();
private static int TaskWorker(object token_as_object)
{
    //the token is available as object parameter
    var token = (CancellationToken)token_as_object;
    //do some long running logic
    var finish = DateTime.Now.AddSeconds(random.Next(1, 10));
    while (DateTime.Now < finish)
    {
        //if the cancellation has been requested
        //an exception will stop task execution
        token.ThrowIfCancellationRequested();
        Thread.Sleep(100);
    }
    return random.Next(1, 1000);
}

Please note that although we can obtain task parallelism with by simply using the Parallel.For/ForEach/Invoke methods, complex scenarios are available only by manually handling task creation, continuation, and waiting. Please remember that a task is simply a deferred job. Nothing more or less. It is how we use it that makes our design using parallelism or asynchronous programming.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.154.252