The Task-based Asynchronous Pattern (TAP) is the newly provided asynchronous programming framework born in .NET 4. TAP provides features of APM and EAP with an added signaling lock like an API that offers a lot of interesting new features.
In .NET, this asynchronous job takes the name of a task. A task is also a class of the System.Threading.Tasks
namespace. Here is a basic example:
var task = Task.Run(() => { Thread.Sleep(3000); //returns a random integer return DateTime.Now.Millisecond; }); Console.Write("Starting data computation..."); //waiting the completation while (task.Status != TaskStatus.RanToCompletion) { Console.Write("."); Thread.Sleep(100); } Console.WriteLine(" OK"); Console.WriteLine("END"); Console.ReadLine();
The preceding example is similar to the first one shown about the APM. Although a lambda expression is used here to create an anonymous method implementation, it is the same as creating a named method like we did in the previous example with the ProcessSomething
instance.
The Task.Run
method starts the asynchronous execution of the remote method provided by the Delegate
object (the lambda syntax actually creates a Delegate
object referring to an un-named method). It immediately returns a Task
object that is usable to the query execution status and eventually waits with any wait handle, shown as follows:
task.Wait();
The preceding lambda-based syntax works on .NET 4.5, while another syntax is available from .NET 4 with more configurations available:
var task = Task.Factory.StartNew<int>(OnAnotherThread);
Although the two methods are actually the same because the Task.Run
method executes the StartNew
method of the default TaskFactory
class, by invoking the StartNew
itself, we can also specify customized options regarding task creation and continuation. In addition, we can create multiple factories, one for each specific group of tasks of a homogenous configuration with less effort and improved manageability.
A special feature of the TaskFactory
class is the ability to marshal the result from APM's EndInvoke
method in a specific task with the FromAsync
method. In such cases, multiple overloads of the same method offer different options such as sending a state parameter or not sending one.
Let's look at a complete example:
static void Main(string[] args) { //the usual delegate for APM var invoker = new Func<int>(OnAnotherThread); //a task catching the EndInvoke in another asynchronous method var fromAsyncTask = Task.Factory.FromAsync<int>(invoker.BeginInvoke, invoker.EndInvoke, null); //this usage of the result will internally invoke the Wait method //blocking the execution until a result will become available Console.WriteLine("From async 1: {0}", fromAsyncTask.Result); //this second overload wants the whole IAsyncResult var status = invoker.BeginInvoke(null, null); //this will catch the EndInvoke in a task var fromAsyncTask2 = Task.Factory.FromAsync<int>(status, invoker.EndInvoke); Console.WriteLine("From async 2: {0}", fromAsyncTask2.Result); Console.ReadLine(); } private static int OnAnotherThread() { Thread.Sleep(500); return DateTime.Now.Millisecond; //a random int }
Actually, the initial section of the code is the best regarding short coding because the second one also needs an IAsyncResult
interface with the need for another variable.
Visit the following MSDN link to learn more about the FromAsync
method:
http://msdn.microsoft.com/en-us/library/dd321469(v=vs.110).aspx
At the end of the page, we can see the following remark:
This remark warns us about using the overload that wants the whole IAsyncResult
. Instead, it suggests using the one that needs the couple Begin
/End
statements.
Another useful option that is available when using the TaskFactory method is the ability to configure how tasks are created; the following code shows an example:
var task1 = Task.Factory.StartNew(() => { //classic task creation with factory defaults var task2 = Task.Factory.StartNew(() => { //task that startup asap }, TaskCreationOptions.PreferFairness); var task3 = Task.Factory.StartNew(() => { //task that will run for a long time }, TaskCreationOptions.LongRunning);
The TaskCreationOptions
enum helps us select between different choices (members) for task startups. The most interesting one here is the last one, that is, the LongRunning
member. Although this does not change the task startup time, its creation will occur on a special background thread, without consuming a classic thread from the ThreadPool
class, where the TaskFactory
class usually takes background threads from, for its tasks.
In theory, although TAP can apply for long-running background work, this is actually not asynchronous programming. It is multi-threading. In actual fact, this logic lies in the factory's Scheduler, an instance of the TaskScheduler
class, the object in charge of handling task execution with the best performance. For the default task scheduler, high throughput is the primary concern. This is why we need to use the PreferFairness
creation option to specify a low-latency startup scenario.
Generically talking, this is a good option because from .NET 4 onwards, good optimization has been applied on the ThreadPool
engine that actually handles a single global FIFO queue of pending user jobs available for the whole process without any local or global lock. In addition to this global queue, any nested task will run on another queue instead, a local queue for each application thread that is running in a LIFO way is optimized for fast execution, CPU cache access optimization, and data locality in CLR memory; the following shows an example:
var task = Task.Factory.StartNew(() => { //will enqueue on global AppPool queue var inner = Task.Factory.StartNew(()=> { //will enqueue on local AppPool queue }); });
An important aspect of the System.Threading.ThreadPool
class usage is that some default limitation on thread availability does exist. Such limitations can be set both at the default pool size (minimum size) and at the maximum size. These defaults are the logical processor count for the minimum pool size, while the maximum size is dynamically set by the CLR itself (in older .NET frameworks, it was statically set as a multiple of the CPU count).
The ThreadPool
class exposes static methods to get and set the minimum (GetMinThreads
) and maximum (GetMaxThreads
) pool size. Together they expose the GetAvailableThreads
method, which gives us the actual remaining thread count number that equals the maximum size once it is subtracted from the currently used thread count. Here is a code example:
int min, minIO, max, maxIO; //retrieve min and max ThreadPool size ThreadPool.GetMinThreads(out min, out minIO); ThreadPool.GetMaxThreads(out max, out maxIO); //retrieve actually available thread count int remaining, remainingIO; ThreadPool.GetAvailableThreads(out remaining, out remainingIO); //set up a new ThreadPool configuration ThreadPool.SetMinThreads(64, 64); ThreadPool.SetMaxThreads(2048, 2048);
Please bear in mind that all requests against the ThreadPool
class will remain queued until some threads become available. This is why, if we create infinite tasks, in a little time, we will exceed the minimum thread pool size, and we will receive an OutOfMemoryException
message. This exception happens because of the unavailability of adding other user tasks to the pool queue. Another important thing to know about the ThreadPool
thread lifecycle is that CLR preallocates enough threads as the specified minimum size. When we continue adding threads, until we reach the maximum size, the CLR will add threads to the thread pool, as we might expect. The difference is that thread increase happens in a very slow way, adding only one thread per second. Here is a straightforward example:
int c = 0; while (true) { Task.Factory.StartNew(() => { Console.WriteLine(++c); Thread.Sleep(-1); }, TaskCreationOptions.PreferFairness); }
This example will print the number of logical threads on your CPU to a console output in a short amount of time. Later, a new thread per second count will be available (and never released), giving us a raw representation of pool increase timings. Please use this example, because as you learned before, without ever releasing such threads, the pool queue will reach its limit quickly, causing an OutOfMemoryException
error.
Let's look at a more complete example:
static void Main(string[] args) { //creates a listener for TCP inbound connections var listener = new TcpListener(IPAddress.Any, 8080); //start it listener.Start(); //accept any client while (true) { //get a task for client connection var client = listener.AcceptTcpClientAsync(); //wait for client connection client.Wait(); //once the connection succeeded, it starts a new task //for handling communication with this new client Task.Factory.StartNew(HandleClientConnection, client, TaskCreationOptions.PreferFairness); //run again to accept new clients } } private static void HandleClientConnection(object arg) { var client = (TcpClient)arg; //do something }
This example shows you how to use asynchronous programming efficiently to handle thousands of client connections on the same port. This code has virtually no limits on the client connection count (but the limit set by Windows itself is somewhere around 65,000 connections per port). Obviously, as already said before, although the code is able to accept a virtually infinite number of clients, only a small number of them will be available to run on our CPU, because of the ThreadPool
timings in its size increase.
The same example made with an old APM instead, will stop accepting clients as soon as it reaches the ThreadPool
default limitations:
static void Main(string[] args) { //creates a listener for TCP inbound connections var listener = new TcpListener(IPAddress.Any, 8080); //start it listener.Start(); //accept any client while (true) { //start waiting for a client var status = listener.BeginAcceptTcpClient(null, null); //wait for client connection status.AsyncWaitHandle.WaitOne(); //catch the asynchronously created client var client = listener.EndAcceptTcpClient(status); //once the connection happened, it start a new thread pool job //for handling communication with such new client ThreadPool.QueueUserWorkItem(HandleClientConnection, client); //run again to accept new clients } } private static void HandleClientConnection(object arg) { var client = (TcpClient)arg; //IMPLEMENTATION OMITTED }
Back to the TaskFactory
class, going deeper with regards to nested tasks and their execution in graph synchronization, we have to differ between attached and detached tasks. Any task may attach itself to its parent task, if any, although this is not the default behavior. With the default behavior, child tasks are detached from their parent tasks. This means that the parent does not care about its child tasks, shown as follows:
var parent = Task.Factory.StartNew(() => { var child = Task.Factory.StartNew(() => { Thread.Sleep(3000); Console.WriteLine("child: ending"); }); Thread.Sleep(1000); }); parent.Wait(); Console.WriteLine("parent: ended before waiting for its child"); Console.ReadLine();
Here is the result:
parent: ended before waiting for its child child: ending
The TaskFactory
class lets us specify that a child task must attach to the parent one by passing the optional parameter TaskCreationOptions.AttachedToParent
. In such a case, the parent will care about its child's status and exceptions by waiting for their completion times, shown as follows:
var parent = Task.Factory.StartNew(() => { var child = Task.Factory.StartNew(() => { Thread.Sleep(3000); Console.WriteLine("child: ending"); }, TaskCreationOptions.AttachedToParent); Thread.Sleep(1000); }); parent.Wait(); Console.WriteLine("parent: ended after waiting for its child"); Console.ReadLine();
As seen in the preceding code, such little differences in code produce a big difference in the result. In two words: the opposite:
child: ending parent: ended after waiting for its child.
Always use the Task.Factory.StartNew
method when dealing with child tasks because the Task.Run
method prevents the child from attaching itself to the parent. The Task.Run
method is only a shortcut for task creation with the default setup, while the Task.Factory.StartNew
method gives us the ability to configure our task initialization options.
Such synchronization has its costs. Therefore, although not really useful, please use multiple outer tasks with the required synchronization techniques, such as waiting for the right number of tasks.
Regarding task synchronization, it is imperative that you understand the difference between all available waiting tasks. Waiting for a task is like waiting for a signaling lock, as already seen in the Multithreading synchronization section in Chapter 3, CLR Internals. The Task
class gives us methods such as Wait
, WaitAll
, or WaitAny
to accomplish jobs as shown in the following example:
//Make a Task wait forever task1.Wait(); //wait for a task to timeout if (task1.Wait(1000)) //ms { //on time } else { //timeout } if (task1.Wait(TimeSpan.FromMinutes(1))) { } else { } //Make tasks wait tasks forever, or timeout Task.WaitAll(task1, task2, task3); if (Task.WaitAll(new[] { task1, task2, task3 }, 1000)) { } if (Task.WaitAll(new[] { task1, task2, task3 }, TimeSpan.FromMinutes(1))) { } //wait the first one with or without timeout //others will although complete their job //wait any always returns the index of the fastest Task.WaitAny(task1, task2, task3); Task.WaitAny(new[] { task1, task2, task3 }, 1000); Task.WaitAny(new[] { task1, task2, task3 }, TimeSpan.FromMinutes(1));
Slightly different from our usual exception handling, as already seen in the Exception Handling section in Chapter 3, CLR Internals, when dealing with tasks, it is impossible to bubble up a raw exception. Any time an exception happens within a task, any tasks waiting, will receive an AggregateException
error that acts as a container for all the exceptions that happened within the tasks being waited on. This behavior is similar to what happens in exceptions that originate in external threads. If we do not invoke the Join
method to stop the external thread, such exceptions will never route to the main thread. Here's an example:
var task1 = Task.Factory.StartNew(() => { throw new ArgumentException("Hi 1"); }); var task2 = Task.Factory.StartNew(() => { throw new ArgumentException("Hi 2"); }); try { //the wait will join the two threads exception routing //all unhandled exceptions from external threads to the //main one Task.WaitAll(task1, task2); } catch (AggregateException ax) { foreach (var ex in ax.InnerExceptions) Console.WriteLine("{0}", ex.Message); }
Another interesting feature when dealing with tasks is the ability to handle task cancellation. A slight similarity does exist between such a design (task cancellation) and the one from the Thread.Abort
method. The difference is that for threads, an exception is raised by CLR itself, immediately stopping the thread's execution; while here, although the design may seem the same, all of the implementation is in our hands. The definition of critical section is something to be forgotten here. By the way, because we have cancellation handling in our hands, we can come up with all we need to accomplish a graceful exit from any critical code block. To accomplish state-of-the-art cancellation handling, we must create a CancellationTokenSource
object to trigger the job cancellation. This source object will create a CancellationToken
object representing a single-use cancellation token. Once used, a new source must be created and used.
To avoid tasks from being started after a cancellation has already been requested, we must pass this cancellation token to the StartNew
method of the TaskFactory
class. Together with this optimization, passing the token to the StartNew
method that informed the Task Parallel Library (TPL) that eventually raised an System.OperationCancelledException
from the token within the task code body, must become a TaskCancelledException
. Here's a complete example:
static CancellationToken cancellationToken; static void Main(string[] args) { //let us configure a minimal //threadpool size to slow down task execution ThreadPool.SetMinThreads(2, 2); ThreadPool.SetMaxThreads(2, 2); //the cancellation token source able to trigger cancellation var cancellationTokenSource = new CancellationTokenSource(); //the cancellation token able to give a feedback on cancellation status cancellationToken = cancellationTokenSource.Token; //let's create some task //the cancellationToken is here assigned to each task. this links such two objects //avoiding a new task from starting if the token has already been triggered //in addition, passing the token here will convert the OperationCancelledException thrown by the ThrowIfCancellationRequested method in the TaskCancelledException class that will inform //TPL that such task has been kindly aborted var tasks = Enumerable.Range(0, 10).Select(i => Task.Factory.StartNew(OnAnotherThread, cancellationToken)).ToArray(); Console.WriteLine("All tasks queued for running"); Console.WriteLine("RETURN TO BEGIN CANCEL TASKS"); Console.ReadLine(); cancellationTokenSource.Cancel(); Console.WriteLine("Cancel requested!"); try { //join back all tasks Task.WaitAll(tasks); } catch (AggregateException ax) { //all tasks will throw an OperationCanceledException foreach (var ex in ax.InnerExceptions) if (!(ex is TaskCanceledException)) Console.WriteLine("Task exception: {0}", ex.Message); } foreach (var t in tasks) Console.WriteLine("Task status ID {0}: {1}", t.Id, t.Status); Console.WriteLine("END"); Console.ReadLine(); } [DebuggerHidden] //avoid visual studio from catching token exceptions private static void OnAnotherThread() { Console.WriteLine("Task {0} starting...", Task.CurrentId); //do some CPU intensive job for (int i = 0; i < 100; i++) { //prevent a cancelled task continuing doing //useless job cancellationToken.ThrowIfCancellationRequested(); //CPU job Thread.Sleep(500); } Console.WriteLine("Task {0} ending...", Task.CurrentId); }
The following is the console output:
All tasks queued for running RETURN TO BEGIN CANCEL TASKS Task 2 starting... Task 1 starting... Cancel requested! Task status ID 1: Canceled Task status ID 2: Canceled Task status ID 3: Canceled Task status ID 4: Canceled Task status ID 5: Canceled Task status ID 6: Canceled Task status ID 7: Canceled Task status ID 8: Canceled Task status ID 9: Canceled Task status ID 10: Canceled END
Another useful feature of any task is the ability to attach (with an extension method) any other task.
When dealing with task continuation, the Status
property of any task, and eventually the Result
property with the internal return value may be valued to apply the right logic for each result.
In such operations, the task continuation helps us by providing a comfortable enumeration used to select the desired Status
property when continuation occurs:
//a task var task1 = Task.Factory.StartNew(() => { Thread.Sleep(1000); //uncomment here for testing the error //throw new Exception("Hi"); return 10; }); //a continuation is attached to task1 task1.ContinueWith(task => { //this continuation will occur only when previous task will run without errors Console.WriteLine("OK: {0}", task.Result); }, TaskContinuationOptions.OnlyOnRanToCompletion); task1.ContinueWith(task => { //this continuation will occur only if something goes wrong Console.WriteLine("ERR: {0}", task.Exception.InnerException); //the first inner exception }, TaskContinuationOptions.NotOnRanToCompletion);
Without waiting for the task1
completion, although a continuation occurs, it skips the need to handle eventual exceptions in the task-creating code.
With continuation and synchronization techniques used together, complex scenarios of asynchronous programming are available to programmers without having to face difficulties such as manual synchronization with signaling locks or by handling parent-child thread synchronization.
As seen previously, the Task
and TaskFactory
classes give us the ability to start tasks with special options. Although this is actually an interesting feature, we still use the default factory available, available throughout the Task.Factory
property.
A TaskFactory
class can also be instantiated with custom options that will work as the starting configuration for any task made with this factory. This is particularly useful when multiple instances of the same kind of task are going to be created. Here is an example:
static void Main(string[] args) { var cancellation = new CancellationTokenSource(); //a factory for creating int-returning tasks //all tasks created by this factory will share this default configuration //all tasks will support cancellation //all tasks will start in an attached-to-parent fashion //all tasks will accept a continuation occurring only on success //the default TaskScheduler will be used var factory = new TaskFactory<int>(cancellation.Token, TaskCreationOptions.PreferFairness, TaskContinuationOptions.AttachedToParent, TaskScheduler.Default); //10 tasks var tasks = Enumerable.Range(1, 10) .Select(i => factory.StartNew(CreateRandomInt) //all tasks continue as attached (from factory) and skipping faulted results .ContinueWith(HandleRandomInt, TaskContinuationOptions.NotOnFaulted)) //always define such query materialization //differently, the foreach run could trigger infinite task creations .ToArray(); bool canContinue = false; //a single continuation async task //will start when all other tasks will end //such usage avoid calling WaitAll factory.ContinueWhenAll(tasks, allTasks => { canContinue = true; return 0; }); do { Console.Clear(); foreach (var task in tasks) Console.WriteLine("Task n. {0}: {1}", task.Id, task.Status); Thread.Sleep(1000); } while (!canContinue); Console.WriteLine("END"); Console.ReadLine(); } //this method executes only if the previous task completed successfully private static void HandleRandomInt(Task<int> task) { Console.WriteLine("Handling value: {0}", task.Result); } static Random random = new Random(); [DebuggerHidden] //this attribute disables exception debugging public static int CreateRandomInt() { //wait 1~10 seconds Thread.Sleep(random.Next(1000, 10000)); //throw exception sometime if (random.Next(1, 100) % 10 == 0) throw new ArgumentException("Unable to produce a valid integer value!"); return random.Next(1, 100); }
This example launches multiple tasks to retrieve integers with some complex calculations (the random sleep time). Later, for the only successfully generated integers, a continuation task is assigned to handle these new values. Usually, this application would have used interaction (for
/foreach
), although in multiple tasks a whole re-join of all such asynchronous executions to catch all results is required. Instead, using continuations, everything is easier because such interaction does not occur.
Another interesting feature visible in the example is the ability to have a continuation task for the whole group of tasks instead of a task-by-task basis. Such usage avoids the WaitAll
invocation. Remember that WaitAll
works like a Thread.Join
method, which opens the door to exception bubbling of all joined tasks in the caller thread.
18.225.235.144