Task-based Asynchronous Pattern (TAP)

The Task-based Asynchronous Pattern (TAP) is the newly provided asynchronous programming framework born in .NET 4. TAP provides features of APM and EAP with an added signaling lock like an API that offers a lot of interesting new features.

Task creation

In .NET, this asynchronous job takes the name of a task. A task is also a class of the System.Threading.Tasks namespace. Here is a basic example:

var task = Task.Run(() =>
    {
        Thread.Sleep(3000);
        //returns a random integer
        return DateTime.Now.Millisecond;
    });

Console.Write("Starting data computation...");

//waiting the completation
while (task.Status != TaskStatus.RanToCompletion)
{
    Console.Write(".");
    Thread.Sleep(100);
}
Console.WriteLine(" OK");
Console.WriteLine("END");
Console.ReadLine();

The preceding example is similar to the first one shown about the APM. Although a lambda expression is used here to create an anonymous method implementation, it is the same as creating a named method like we did in the previous example with the ProcessSomething instance.

The Task.Run method starts the asynchronous execution of the remote method provided by the Delegate object (the lambda syntax actually creates a Delegate object referring to an un-named method). It immediately returns a Task object that is usable to the query execution status and eventually waits with any wait handle, shown as follows:

task.Wait();

The preceding lambda-based syntax works on .NET 4.5, while another syntax is available from .NET 4 with more configurations available:

var task = Task.Factory.StartNew<int>(OnAnotherThread);

Although the two methods are actually the same because the Task.Run method executes the StartNew method of the default TaskFactory class, by invoking the StartNew itself, we can also specify customized options regarding task creation and continuation. In addition, we can create multiple factories, one for each specific group of tasks of a homogenous configuration with less effort and improved manageability.

A special feature of the TaskFactory class is the ability to marshal the result from APM's EndInvoke method in a specific task with the FromAsync method. In such cases, multiple overloads of the same method offer different options such as sending a state parameter or not sending one.

Let's look at a complete example:

static void Main(string[] args)
{
    //the usual delegate for APM
    var invoker = new Func<int>(OnAnotherThread);

    //a task catching the EndInvoke in another asynchronous method
    var fromAsyncTask = Task.Factory.FromAsync<int>(invoker.BeginInvoke, invoker.EndInvoke, null);

    //this usage of the result will internally invoke the Wait method
    //blocking the execution until a result will become available
    Console.WriteLine("From async 1: {0}", fromAsyncTask.Result);

    //this second overload wants the whole IAsyncResult
    var status = invoker.BeginInvoke(null, null);
    //this will catch the EndInvoke in a task
    var fromAsyncTask2 = Task.Factory.FromAsync<int>(status, invoker.EndInvoke);

    Console.WriteLine("From async 2: {0}", fromAsyncTask2.Result);

    Console.ReadLine();
}

private static int OnAnotherThread()
{
    Thread.Sleep(500);
    return DateTime.Now.Millisecond; //a random int
}

Actually, the initial section of the code is the best regarding short coding because the second one also needs an IAsyncResult interface with the need for another variable.

Visit the following MSDN link to learn more about the FromAsync method:

http://msdn.microsoft.com/en-us/library/dd321469(v=vs.110).aspx

At the end of the page, we can see the following remark:

Task creation

This remark warns us about using the overload that wants the whole IAsyncResult. Instead, it suggests using the one that needs the couple Begin/End statements.

Another useful option that is available when using the TaskFactory method is the ability to configure how tasks are created; the following code shows an example:

var task1 = Task.Factory.StartNew(() =>
{
    //classic task creation with factory defaults


var task2 = Task.Factory.StartNew(() =>
{
    //task that startup asap
}, TaskCreationOptions.PreferFairness);

var task3 = Task.Factory.StartNew(() =>
{
    //task that will run for a long time
}, TaskCreationOptions.LongRunning);

The TaskCreationOptions enum helps us select between different choices (members) for task startups. The most interesting one here is the last one, that is, the LongRunning member. Although this does not change the task startup time, its creation will occur on a special background thread, without consuming a classic thread from the ThreadPool class, where the TaskFactory class usually takes background threads from, for its tasks.

In theory, although TAP can apply for long-running background work, this is actually not asynchronous programming. It is multi-threading. In actual fact, this logic lies in the factory's Scheduler, an instance of the TaskScheduler class, the object in charge of handling task execution with the best performance. For the default task scheduler, high throughput is the primary concern. This is why we need to use the PreferFairness creation option to specify a low-latency startup scenario.

Generically talking, this is a good option because from .NET 4 onwards, good optimization has been applied on the ThreadPool engine that actually handles a single global FIFO queue of pending user jobs available for the whole process without any local or global lock. In addition to this global queue, any nested task will run on another queue instead, a local queue for each application thread that is running in a LIFO way is optimized for fast execution, CPU cache access optimization, and data locality in CLR memory; the following shows an example:

var task = Task.Factory.StartNew(() =>
{
    //will enqueue on global AppPool queue
    var inner = Task.Factory.StartNew(()=>
        {
            //will enqueue on local AppPool queue
        });
});

An important aspect of the System.Threading.ThreadPool class usage is that some default limitation on thread availability does exist. Such limitations can be set both at the default pool size (minimum size) and at the maximum size. These defaults are the logical processor count for the minimum pool size, while the maximum size is dynamically set by the CLR itself (in older .NET frameworks, it was statically set as a multiple of the CPU count).

The ThreadPool class exposes static methods to get and set the minimum (GetMinThreads) and maximum (GetMaxThreads) pool size. Together they expose the GetAvailableThreads method, which gives us the actual remaining thread count number that equals the maximum size once it is subtracted from the currently used thread count. Here is a code example:

int min, minIO, max, maxIO;
//retrieve min and max ThreadPool size
ThreadPool.GetMinThreads(out min, out minIO);
ThreadPool.GetMaxThreads(out max, out maxIO);

//retrieve actually available thread count
int remaining, remainingIO;
ThreadPool.GetAvailableThreads(out remaining, out remainingIO);

//set up a new ThreadPool configuration
ThreadPool.SetMinThreads(64, 64);
ThreadPool.SetMaxThreads(2048, 2048);

Please bear in mind that all requests against the ThreadPool class will remain queued until some threads become available. This is why, if we create infinite tasks, in a little time, we will exceed the minimum thread pool size, and we will receive an OutOfMemoryException message. This exception happens because of the unavailability of adding other user tasks to the pool queue. Another important thing to know about the ThreadPool thread lifecycle is that CLR preallocates enough threads as the specified minimum size. When we continue adding threads, until we reach the maximum size, the CLR will add threads to the thread pool, as we might expect. The difference is that thread increase happens in a very slow way, adding only one thread per second. Here is a straightforward example:

int c = 0;
while (true)
{
    Task.Factory.StartNew(() =>
        {
            Console.WriteLine(++c);
            Thread.Sleep(-1);
        }, TaskCreationOptions.PreferFairness);
}

This example will print the number of logical threads on your CPU to a console output in a short amount of time. Later, a new thread per second count will be available (and never released), giving us a raw representation of pool increase timings. Please use this example, because as you learned before, without ever releasing such threads, the pool queue will reach its limit quickly, causing an OutOfMemoryException error.

Let's look at a more complete example:

static void Main(string[] args)
{
    //creates a listener for TCP inbound connections
    var listener = new TcpListener(IPAddress.Any, 8080);

    //start it
    listener.Start();

    //accept any client
    while (true)
    {
        //get a task for client connection
        var client = listener.AcceptTcpClientAsync();
        //wait for client connection
        client.Wait();

        //once the connection succeeded, it starts a new task
        //for handling communication with this new client

        Task.Factory.StartNew(HandleClientConnection, client, TaskCreationOptions.PreferFairness);

        //run again to accept new clients
    }
}

private static void HandleClientConnection(object arg)
{
    var client = (TcpClient)arg;
    //do something
}

This example shows you how to use asynchronous programming efficiently to handle thousands of client connections on the same port. This code has virtually no limits on the client connection count (but the limit set by Windows itself is somewhere around 65,000 connections per port). Obviously, as already said before, although the code is able to accept a virtually infinite number of clients, only a small number of them will be available to run on our CPU, because of the ThreadPool timings in its size increase.

The same example made with an old APM instead, will stop accepting clients as soon as it reaches the ThreadPool default limitations:

static void Main(string[] args)
{
    //creates a listener for TCP inbound connections
    var listener = new TcpListener(IPAddress.Any, 8080);

    //start it
    listener.Start();

    //accept any client
    while (true)
    {
        //start waiting for a client
        var status = listener.BeginAcceptTcpClient(null, null);

        //wait for client connection
        status.AsyncWaitHandle.WaitOne();

        //catch the asynchronously created client
        var client = listener.EndAcceptTcpClient(status);

        //once the connection happened, it start a new thread pool job
        //for handling communication with such new client

        ThreadPool.QueueUserWorkItem(HandleClientConnection, client);

        //run again to accept new clients
    }
}

private static void HandleClientConnection(object arg)
{
    var client = (TcpClient)arg;
    //IMPLEMENTATION OMITTED
}

Task synchronization

Back to the TaskFactory class, going deeper with regards to nested tasks and their execution in graph synchronization, we have to differ between attached and detached tasks. Any task may attach itself to its parent task, if any, although this is not the default behavior. With the default behavior, child tasks are detached from their parent tasks. This means that the parent does not care about its child tasks, shown as follows:

var parent = Task.Factory.StartNew(() =>
{
    var child = Task.Factory.StartNew(() =>
    {
        Thread.Sleep(3000);
        Console.WriteLine("child: ending");
    });
    Thread.Sleep(1000);
});

parent.Wait();
Console.WriteLine("parent: ended before waiting for its child");
Console.ReadLine();

Here is the result:

parent: ended before waiting for its child
child: ending

The TaskFactory class lets us specify that a child task must attach to the parent one by passing the optional parameter TaskCreationOptions.AttachedToParent. In such a case, the parent will care about its child's status and exceptions by waiting for their completion times, shown as follows:

var parent = Task.Factory.StartNew(() =>
{
    var child = Task.Factory.StartNew(() =>
    {
        Thread.Sleep(3000);
        Console.WriteLine("child: ending");
    }, TaskCreationOptions.AttachedToParent);
    Thread.Sleep(1000);
});

parent.Wait();
Console.WriteLine("parent: ended after waiting for its child");
Console.ReadLine();

As seen in the preceding code, such little differences in code produce a big difference in the result. In two words: the opposite:

child: ending
parent: ended after waiting for its child.

Always use the Task.Factory.StartNew method when dealing with child tasks because the Task.Run method prevents the child from attaching itself to the parent. The Task.Run method is only a shortcut for task creation with the default setup, while the Task.Factory.StartNew method gives us the ability to configure our task initialization options.

Such synchronization has its costs. Therefore, although not really useful, please use multiple outer tasks with the required synchronization techniques, such as waiting for the right number of tasks.

Regarding task synchronization, it is imperative that you understand the difference between all available waiting tasks. Waiting for a task is like waiting for a signaling lock, as already seen in the Multithreading synchronization section in Chapter 3, CLR Internals. The Task class gives us methods such as Wait, WaitAll, or WaitAny to accomplish jobs as shown in the following example:

//Make a Task wait forever
task1.Wait();

//wait for a task to timeout
if (task1.Wait(1000)) //ms
{
    //on time
}
else
{
    //timeout
}

if (task1.Wait(TimeSpan.FromMinutes(1))) { } else { }

//Make tasks wait tasks forever, or timeout
Task.WaitAll(task1, task2, task3);
if (Task.WaitAll(new[] { task1, task2, task3 }, 1000)) { }
if (Task.WaitAll(new[] { task1, task2, task3 }, TimeSpan.FromMinutes(1))) { }

//wait the first one with or without timeout
//others will although complete their job
//wait any always returns the index of the fastest
Task.WaitAny(task1, task2, task3);
Task.WaitAny(new[] { task1, task2, task3 }, 1000);
Task.WaitAny(new[] { task1, task2, task3 }, TimeSpan.FromMinutes(1));

Task exception handling

Slightly different from our usual exception handling, as already seen in the Exception Handling section in Chapter 3, CLR Internals, when dealing with tasks, it is impossible to bubble up a raw exception. Any time an exception happens within a task, any tasks waiting, will receive an AggregateException error that acts as a container for all the exceptions that happened within the tasks being waited on. This behavior is similar to what happens in exceptions that originate in external threads. If we do not invoke the Join method to stop the external thread, such exceptions will never route to the main thread. Here's an example:

var task1 = Task.Factory.StartNew(() =>
    {
        throw new ArgumentException("Hi 1");
    });

var task2 = Task.Factory.StartNew(() =>
{
    throw new ArgumentException("Hi 2");
});

try
{
    //the wait will join the two threads exception routing
    //all unhandled exceptions from external threads to the
    //main one
    Task.WaitAll(task1, task2);
}
catch (AggregateException ax)
{
    foreach (var ex in ax.InnerExceptions)
        Console.WriteLine("{0}", ex.Message);
}

Task cancellation

Another interesting feature when dealing with tasks is the ability to handle task cancellation. A slight similarity does exist between such a design (task cancellation) and the one from the Thread.Abort method. The difference is that for threads, an exception is raised by CLR itself, immediately stopping the thread's execution; while here, although the design may seem the same, all of the implementation is in our hands. The definition of critical section is something to be forgotten here. By the way, because we have cancellation handling in our hands, we can come up with all we need to accomplish a graceful exit from any critical code block. To accomplish state-of-the-art cancellation handling, we must create a CancellationTokenSource object to trigger the job cancellation. This source object will create a CancellationToken object representing a single-use cancellation token. Once used, a new source must be created and used.

To avoid tasks from being started after a cancellation has already been requested, we must pass this cancellation token to the StartNew method of the TaskFactory class. Together with this optimization, passing the token to the StartNew method that informed the Task Parallel Library (TPL) that eventually raised an System.OperationCancelledException from the token within the task code body, must become a TaskCancelledException. Here's a complete example:

static CancellationToken cancellationToken;
static void Main(string[] args)
{
    //let us configure a minimal    //threadpool size to slow down task execution
    ThreadPool.SetMinThreads(2, 2);
    ThreadPool.SetMaxThreads(2, 2);

    //the cancellation token source able to trigger cancellation
    var cancellationTokenSource = new CancellationTokenSource();
    //the cancellation token able to give a feedback on cancellation status
    cancellationToken = cancellationTokenSource.Token;

    //let's create some task
    //the cancellationToken is here assigned to each task. this links such two objects
    //avoiding a new task from starting if the token has already been triggered
    //in addition, passing the token here will convert the OperationCancelledException thrown by the ThrowIfCancellationRequested method in the TaskCancelledException class that will inform
    //TPL that such task has been kindly aborted
    var tasks = Enumerable.Range(0, 10).Select(i => Task.Factory.StartNew(OnAnotherThread, cancellationToken)).ToArray();

    Console.WriteLine("All tasks queued for running");
    Console.WriteLine("RETURN TO BEGIN CANCEL TASKS");
    Console.ReadLine();
    cancellationTokenSource.Cancel();

    Console.WriteLine("Cancel requested!");

    try
    {
        //join back all tasks
        Task.WaitAll(tasks);
    }
    catch (AggregateException ax)
    {
        //all tasks will throw an OperationCanceledException
        foreach (var ex in ax.InnerExceptions)
            if (!(ex is TaskCanceledException))
                Console.WriteLine("Task exception: {0}", ex.Message);
    }

    foreach (var t in tasks)
        Console.WriteLine("Task status ID {0}: {1}", t.Id, t.Status);

    Console.WriteLine("END");
    Console.ReadLine();
}

[DebuggerHidden] //avoid visual studio from catching token exceptions
private static void OnAnotherThread()
{
    Console.WriteLine("Task {0} starting...", Task.CurrentId);

    //do some CPU intensive job
    for (int i = 0; i < 100; i++)
    {
        //prevent a cancelled task continuing doing
        //useless job
        cancellationToken.ThrowIfCancellationRequested();

        //CPU job
        Thread.Sleep(500);
    }

    Console.WriteLine("Task {0} ending...", Task.CurrentId);
}

The following is the console output:

All tasks queued for running
RETURN TO BEGIN CANCEL TASKS
Task 2 starting...
Task 1 starting...

Cancel requested!
Task status ID 1: Canceled
Task status ID 2: Canceled
Task status ID 3: Canceled
Task status ID 4: Canceled
Task status ID 5: Canceled
Task status ID 6: Canceled
Task status ID 7: Canceled
Task status ID 8: Canceled
Task status ID 9: Canceled
Task status ID 10: Canceled
END

Task continuation

Another useful feature of any task is the ability to attach (with an extension method) any other task.

When dealing with task continuation, the Status property of any task, and eventually the Result property with the internal return value may be valued to apply the right logic for each result.

In such operations, the task continuation helps us by providing a comfortable enumeration used to select the desired Status property when continuation occurs:

//a task
var task1 = Task.Factory.StartNew(() =>
    {
        Thread.Sleep(1000);
        //uncomment here for testing the error
        //throw new Exception("Hi");
        return 10;
    });

//a continuation is attached to task1
task1.ContinueWith(task =>
    {
        //this continuation will occur only when previous task will run without errors
        Console.WriteLine("OK: {0}", task.Result);
    }, TaskContinuationOptions.OnlyOnRanToCompletion);

task1.ContinueWith(task =>
    {
        //this continuation will occur only if something goes wrong
        Console.WriteLine("ERR: {0}", task.Exception.InnerException); //the first inner exception
    }, TaskContinuationOptions.NotOnRanToCompletion);

Without waiting for the task1 completion, although a continuation occurs, it skips the need to handle eventual exceptions in the task-creating code.

With continuation and synchronization techniques used together, complex scenarios of asynchronous programming are available to programmers without having to face difficulties such as manual synchronization with signaling locks or by handling parent-child thread synchronization.

Task factories

As seen previously, the Task and TaskFactory classes give us the ability to start tasks with special options. Although this is actually an interesting feature, we still use the default factory available, available throughout the Task.Factory property.

A TaskFactory class can also be instantiated with custom options that will work as the starting configuration for any task made with this factory. This is particularly useful when multiple instances of the same kind of task are going to be created. Here is an example:

static void Main(string[] args)
{
    var cancellation = new CancellationTokenSource();

    //a factory for creating int-returning tasks
    //all tasks created by this factory will share this default configuration
    //all tasks will support cancellation
    //all tasks will start in an attached-to-parent fashion
    //all tasks will accept a continuation occurring only on success
    //the default TaskScheduler will be used
    var factory = new TaskFactory<int>(cancellation.Token,
        TaskCreationOptions.PreferFairness,
        TaskContinuationOptions.AttachedToParent,
        TaskScheduler.Default);

    //10 tasks
    var tasks = Enumerable.Range(1, 10)
        .Select(i => factory.StartNew(CreateRandomInt)
            //all tasks continue as attached (from factory) and skipping faulted results
            .ContinueWith(HandleRandomInt, TaskContinuationOptions.NotOnFaulted))
        //always define such query materialization
        //differently, the foreach run could trigger infinite task creations
            .ToArray();

    bool canContinue = false;

    //a single continuation async task
    //will start when all other tasks will end
    //such usage avoid calling WaitAll
    factory.ContinueWhenAll(tasks, allTasks =>
        {
            canContinue = true;
            return 0;
        });

    do
    {
        Console.Clear();

        foreach (var task in tasks)
            Console.WriteLine("Task n. {0}: {1}", task.Id, task.Status);

        Thread.Sleep(1000);
    } while (!canContinue);

    Console.WriteLine("END");
    Console.ReadLine();
}

//this method executes only if the previous task completed successfully
private static void HandleRandomInt(Task<int> task)
{
    Console.WriteLine("Handling value: {0}", task.Result);
}

static Random random = new Random();

[DebuggerHidden] //this attribute disables exception debugging
public static int CreateRandomInt()
{
    //wait 1~10 seconds
    Thread.Sleep(random.Next(1000, 10000));

    //throw exception sometime
    if (random.Next(1, 100) % 10 == 0)
        throw new ArgumentException("Unable to produce a valid integer value!");

    return random.Next(1, 100);
}

This example launches multiple tasks to retrieve integers with some complex calculations (the random sleep time). Later, for the only successfully generated integers, a continuation task is assigned to handle these new values. Usually, this application would have used interaction (for/foreach), although in multiple tasks a whole re-join of all such asynchronous executions to catch all results is required. Instead, using continuations, everything is easier because such interaction does not occur.

Another interesting feature visible in the example is the ability to have a continuation task for the whole group of tasks instead of a task-by-task basis. Such usage avoids the WaitAll invocation. Remember that WaitAll works like a Thread.Join method, which opens the door to exception bubbling of all joined tasks in the caller thread.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.167.195