CHAPTER 8

image

Everything a Task

In Chapter 7 you discovered how the async and await keywords simplify the composing and consuming of Task-based asynchronous logic. Also, in Chapter 3 we mentioned that a Task represents a piece of asynchronous activity. This asynchronous activity could be compute but could as easily be I/O. An example of a noncompute Task is when you turned an IAsyncResult into a Task utilizing Task.Factory.FromAsyncResult. If you could literally represent anything as a Task, then you could have more areas of your code that could take advantage of the async and await keywords. In this chapter you will discover there is a very simple API to achieve just this. Taking advantage of this API, we will show you a series of common use cases, from an efficient version of WhenAny to stubbing out Task-based APIs for the purpose of unit testing.

Finally it is worth mentioning that this API works on .NET 4, and is extremely useful even without async/await.

TaskCompletionSource<T>

TaskCompletionSource<T> has two responsibilities, one of which is to produce a Task<T> object. The other is to provide a series of methods to control the outcome of the Task. As mentioned in Chapter 3, a Task can finish in one of three states (RanToCompletion, Cancelled, Faulted). Listing 8-1 shows a subset of the TaskCompletionSource class.

Listing 8-1.  TaskCompletionSource<T>

public TaskCompletionSource<T>
{
        . . .
 
        public Task<TResult> Task { get; }
 
        public void SetCanceled();
        public void SetException(Exception exception);
        public void SetResult(TResult result);
}

You have seen a similar API to this before: CancellationTokenSource, for controlling the cancellation process. A TaskCompletionSource is used by code wishing to control the outcome of the Task under the control of a given TaskCompletionSource. The TaskCompletionSource object exposes a Task object via its Task property; this Task is passed to code wishing to observe the Task. The type argument used with the TaskCompletionSource<T> is used to indicate the result type for the Task. If you wish to produce a Task as opposed to a Task<T>, then Microsoft’s advice is to use TaskCompletionSource<object>. Task<object> extends Task, so it can always be treated as just a Task.

Listing 8-2 shows a simple example of producing a Task<int> via a TaskCompletionSource<int>. The Task is not deemed to have completed until Enter is pressed and the Task’s outcome is set by the call to SetResult.

Listing 8-2.  Task<int> via TaskCompletionSource<int>

var tcs = new TaskCompletionSource<int>();
 
Task<int> syntheticTask = tcs.Task;
 
syntheticTask.ContinueWith(t => Console.WriteLine("Result {0}", t.Result));
 
Console.WriteLine("Press enter to complete the Task");
Console.ReadLine();
 
tcs.SetResult(42);
Console.ReadLine();

Worked Example: Creating a Foreground Task

There are two types of threads in .NET: foreground and background. For a process to remain alive there must be at least one foreground thread. If there are no foreground threads running, the process will exit even if there are still background threads in operation. A Task that will keep the process alive must be mapped to a foreground thread. Tasks produced by TPL are always mapped onto background threads, so that no running Task will keep the process alive. Since TaskCompletionSource<T> allows the lifetime of anything to be represented as a Task, you could use it to represent the lifetime of a foreground thread. Listing 8-3 shows a method that creates such a Task.

Listing 8-3.  Foreground Task

static void Main(string[] args)
{
  Task fgTask = CreateForegroundTask<int>(() =>
                    {
                        Console.WriteLine("Running..");
                        Thread.Sleep(2000);
                        return 42;
                    })
                    .ContinueWith(t => Console.WriteLine("Result is {0}",t.Result),
                                  TaskContinuationOptions.ExecuteSynchronously);
                Console.WriteLine("Main Completed");
   }
      
   public static Task<T> CreateForegroundTask<T>(Func<T> taskBody)
   {
      return CreateForegroundTask(taskBody, CancellationToken.None);
   }
 
   public static Task<T> CreateForegroundTask<T>(Func<T> taskBody,CancellationToken ct)
   {
     var tcs = new TaskCompletionSource<T>();
     // Create a foreground thread, and start it
     var fgThread = new Thread(() => ExecuteForegroundTaskBody(taskBody, ct, tcs));
     fgThread.Start();
 
     // Return a task that is bound to the life time of the foreground thread body
     return tcs.Task;
   }
 
   private static void ExecuteForegroundTaskBody<T>(Func<T> taskBody, CancellationToken ct,
                                                    TaskCompletionSource<T> tcs)
   {
      try
      {
        T result = taskBody();
        tcs.SetResult(result);
      }
      catch (OperationCanceledException cancelledException)
      {
        // If the Task body ended in a OperationCancelledException
        // and the Cancellation is associated with the cancellation token
        // for this Task, mark the Task as cancelled, otherwise just set the exception
        // on the Task.
        if (ct == cancelledException.CancellationToken)
        {
           tcs.SetCanceled();
        }
        else
        {
           tcs.SetException(cancelledException);
        }
       }
       catch (Exception error)
       {
         // Set the Task status to Faulted, and re-throw as part of an AggregateException
         tcs.SetException(error);
       }
   }

This same technique can be used for creating STA Tasks or Tasks bound to threads with high priority. TaskCompletionSource<T> is therefore a very powerful building block, allowing legacy code to be simply wrapped up and as exposed as a Task. Once wrapped up as a Task, it can be managed using modern constructs such async and await.

Unit Testing and Stubbing Asynchronous Methods

Writing unit tests for methods often requires us to stub out method calls to objects not under test. Consider writing a test for the WebPage class shown in Listing 8-4. The responsibility of this class is to load the document represented by the current Url property, and to make the content available via the Document property.

Listing 8-4.  WebPage

public class WebPage
{
  private readonly Func<string, string> pageLoader;
  public WebPage()
  {
    pageLoader = uri =>
    {
      using (var c = new WebClient())
      {
        return c.DownloadString(uri);
      }
     };
   }
 
   public WebPage(Func<string, string> pageLoader)
   {
     this.pageLoader = pageLoader;
   }
 
   private string url;
   public string Url
   {
     get { return url; }
     set
     {
       Document = pageLoader(url);
       url = value;
      }
   }
 
   public string Document { get; private set; }
 
}

Listing 8-5 shows a possible unit test for the Url method. You will have noticed that the WebPage class allows the injection of a method responsible for loading the requested document. When unit testing, you will need to provide a non-web-based implementation of this method (stub).

Listing 8-5.  Task.Run-Based Unit Test

[TestClass]
public class WebPageTests
{
   [TestMethod]
   public void Url_PropertyIsChanged_ShouldDownloadNewPageContent()
   {
     string expectedPageContent = "<html><i>Dummy content</i></html>";
     var sut = new WebPage(uri => expectedPageContent);
     sut.Url = "http://dummy.com";
     Assert.AreEqual(expectedPageContent, sut.Document);
   }
}

The test works perfectly well, but now say the developer of the WebPage class decides it would be better to load the page asynchronously. Listing 8-6 shows the refactored code.

Listing 8-6.  Asynchronous WebPage

public class WebPage
{
  private readonly Func<string, Task<string>> pageLoader;
 
  public WebPage()
  {
    pageLoader = uri =>
    {
      using (var c = new WebClient())
      {
        return c.DownloadStringTaskAsync(uri);
      }
    };
   }
 
   public WebPage(Func<string, Task<string>> pageLoader)
   {
     this.pageLoader = pageLoader;
   }
 
   private string url;
   public string Url
   {
       get { return url; }
       set
       {
         Document = null;
 
         pageLoader(url)
                    .ContinueWith(dt => Document = dt.Result);
                
          url = value;
        }
     }
 
     public string Document { get; private set; }
    }

To test this version of the class, you now need to supply a stub that returns a Task<string> as opposed to just simply a string. Listing 8-7 shows a possible implementation of a test.

Listing 8-7.  Task.Run-Based Test

[TestClass]
public class WebPageTests
{
  [TestMethod]
  public void Url_PropertyIsChanged_ShouldDownloadNewPageContent()
  {
    string expectedPageContent = "<html><i>Dummy content</i></html>";
    var sut = new WebPage(uri => Task.Run<string>(() => expectedPageContent));
    sut.Url = "http://dummy.com";
          
    Assert.AreEqual(expectedPageContent,sut.Document);
  }
}

The only difference between the two implementations of the tests is that in the second, you have wrapped the expected result with a Task.Run. This ensures you comply with the requirements of the page load delegate. Although all the code compiles, the test will fail. Why? Well, hopefully by now you are all Asynchronous Gurus and have spotted the race condition. With the synchronous version, the Document property gets set once the Url property setter has download the content. With the asynchronous version this is not guaranteed. For the test to run successfully the assertion phase can’t happen until the Document property has been modified. One possible workaround is to add an additional property to the WebPage class to expose the Task responsible for updating the document. The test method could then wait for that Task to complete before performing the assertion. Listing 8-8 shows the modifications required to the WebPage class.

Listing 8-8.  Exposing the DocumentLoadingTask

public Task DocumentLoadingTask { get; private set; }
 
private string url;
public string Url
{
  get { return url; }
  set
  {
    Document = null;
    DocumentLoadingTask =
                   pageLoader(url)
                      .ContinueWith(dt => Document = dt.Result);
    url = value;
  }

The test code is then modified to wait for the DocumentLoadingTask to complete before performing the assertion. This then allows the test to have deterministic behavior (Listing 8-9).

Listing 8-9.  Waiting for DocumentLoadingTask

public void Url_PropertyIsChanged_ShouldDownloadNewPageContent()
{
  string expectedPageContent = "<html><i>Dummy content</i></html>";
  var sut = new WebPage(uri => Task.Run<string>(() => expectedPageContent));
 
  sut.Url = "http://dummy.com";
  sut.DocumentLoadingTask.Wait();
            
  Assert.AreEqual(expectedPageContent,sut.Document);
}

Rerunning the test now results in it passing. However, the use of Task.Run does seem wrong. As the goal of unit tests is to run as lightweight as possible, creating threads that return immediately seems overkill. A far lighter-weight approach would be to use TaskCompletionSource to generate the asynchronous value. Listing 8-10 contains a helper class that can be used with testing to create asynchronous stubbed results.

Listing 8-10.  AsyncStubs

public static class AsyncStubs
{
   public static Task<T> FromResult<T>(T result)
   {
      var tcs = new TaskCompletionSource<T>();
            tcs.SetResult(result);
 
            return tcs.Task;
   }
 
   public static Task<T> FromException<T>(Exception e)
   {
      var tcs = new TaskCompletionSource<T>();
      tcs.SetException(e);
      return tcs.Task;
   }
}

Listing 8-11 shows the test method rewritten in terms of AsyncStubs.FromResult.

Listing 8-11.  TaskCompletionSource-Based Stubs

[TestMethod]
public void Url_PropertyIsChanged_ShouldDownloadNewPageContent()
{
  string expectedPageContent = "<html><i>Dummy content</i></html>";
  var sut = new WebPage(uri => AsyncStubs.FromResult(expectedPageContent));
  sut.Url = "http://dummy.com";
  sut.DocumentLoadingTask.Wait();
  Assert.AreEqual(expectedPageContent,sut.Document);
}

This technique has now found its way into .NET 4.5 with the introduction of the Task.FromResult<T> method, but unfortunately no FromException. In effect, what you have done for testing is to make the asynchronous operation almost complete synchronously. The only bit that is still asynchronous is the ContinuesWith call, which sets the Document property. You could make it all complete synchronously if you modify the ContinuesWith to execute synchronously, which allows the removal of the property DocumentLoadingTask. Making a ContinuesWith execute synchronously results in either

  • The continuation Tasks running on the same thread that causes the antecedent Task to transition into its final state, or
  • Alternatively, if the antecedent is already complete when the continuation is created, the continuation will run on the thread creating the continuation.

Since all the continuation does is simply set a value of the property, then it is perfectly acceptable to execute synchronously. Listing 8-12 shows the modified WebPage class. The sut.DocumentDownloadTasking.Wait can now be removed from the test method as the Url set operation in the context of unit testing now executes synchronously, simplifying the test method.

Listing 8-12.  ExecuteSynchronously

public string Url
{
  get { return url; }
  set
  {
     Document = null;
     pageLoader(url)
                    .ContinueWith(dt => Document = dt.Result,
                    TaskContinuationOptions.ExecuteSynchronously);
     url = value;
   }
}

Building Task-Based Combinators

In Chapter 7 we demonstrated the use of WhenAny and WhenAll as a means of awaiting on many Tasks. We highlighted the fact that making repetitive calls to WhenAny for a smaller and smaller subset of Tasks was not very efficient, due to the fact that it was repeatedly creating continuations. The out-of-the-box WhenAll also potentially has a failing in that when waiting for many Tasks to complete, it may be desirable for the wait to terminate early if any of the Tasks fail or get canceled. You can address both these issues by making use of TaskCompletionSource<T>.

Improved WhenAny

First, remind yourself of the code you developed in Chapter 7. Listing 8-13 shows repeated calls to Task.WhenAny setting up many continuations for an ever-decreasing set of Tasks. Ideally you would like to set up just one set of continuations.

Listing 8-13.  Out of the Box WhenAny

public static async Task DownloadDocumentsWhenAny(params Uri[] downloads)
{
  List<Task<string>> tasks = new List<Task<string>>();
  foreach (Uri uri in downloads)
  {
    var client = new WebClient();
 
    tasks.Add(client.DownloadStringTaskAsync(uri));
  }
  while (tasks.Count > 0)
  {
    Task<string> download =
                 await Task.WhenAny(tasks);
 
    UpdateUI(download.Result);
    int nDownloadCompleted = tasks.IndexOf(download);
    tasks.RemoveAt(nDownloadCompleted);
  }
 }

Instead of returning a single Task from a Task.WhenAny style method, you could return an IEnumerable<Task<T>>, where each Task in the IEnumerable would complete in order. You could then simply loop over the set of Tasks, waiting on each of them in turn (Listing 8-14).

Listing 8-14.  Simplified WhenAny, WhenNext

public static class TaskCombinators
{
  public static IEnumerable<Task<T>> OrderByCompletion<T>(this IEnumerable<Task<T>> tasks)
  { . . . }
}
 
public static async Task DownloadDocumentsWhenAny(params Uri[] downloads)
  {
    List<Task<string>> tasks = new List<Task<string>>();
    foreach (Uri uri in downloads)
    {
       var client = new WebClient();
 
       tasks.Add(client.DownloadStringTaskAsync(uri));
     }
 
     foreach(Task<string> downloadTask in tasks.OrderByCompletion() )
     {
       Task<string> download = await downloadTask;
 
       UpdateUI(download.Result);
      }
   }

The problem is that you don’t know in what order the Tasks will complete; hence WhenAny. However if you were to return not the actual Tasks, but to a series of Tasks that represent the result of the first Task to complete, and then the second Task to complete, et cetera, that would suffice. To achieve this you will create as many TaskCompletionSources as there are Tasks. Each real Task would then have a single continuation registered; the responsibility of the first continuation to actually run is to set the result of the first TaskCompletionSource.Task to the outcome of the antecedent Task. Each subsequent continuation sets the next TaskCompletionSource result. Last, the method returns the Tasks for each of the TaskCompletionSources, to be consumed by the caller using a foreach. Listing 8-15 shows the implementation.

Listing 8-15.  OrderByCompletion

public static class TaskCombinators
{
  public static IEnumerable<Task<T>> OrderByCompletion<T>(this IEnumerable<Task<T>> tasks)
  {
   if (tasks == null) throw new ArgumentNullException("tasks");
 
   List<Task<T>> allTasks = tasks.ToList();
   if ( allTasks.Count == 0 ) throw new ArgumentException("Must have at least one task");
 
   var taskCompletionsSources = new TaskCompletionSource<T>[allTasks.Count];
 
   int nextCompletedTask = -1;
   for (int nTask = 0; nTask < allTasks.Count; nTask++)
   {
     taskCompletionsSources[nTask] = new TaskCompletionSource<T>();
     allTasks[nTask].ContinueWith(t =>
       {
         int taskToComplete = Interlocked.Increment(ref nextCompletedTask);
         switch (t.Status)
         {
            case TaskStatus.RanToCompletion:
              taskCompletionsSources[taskToComplete].SetResult(t.Result);
              break;
 
            case TaskStatus.Faulted:
              taskCompletionsSources[taskToComplete]
                   .SetException(t.Exception.InnerExceptions);
              break;
 
            case TaskStatus.Canceled:
              taskCompletionsSources[taskToComplete].SetCanceled();
              break;
          }
       } , TaskContinuationOptions.ExecuteSynchronously);
    }
  }
}

There are a couple of things to point out in the implementation. Notice it is using a form of SetException that takes a IEnumerable<Exception>. You could have simply called SetException(t.Exception), but this would have resulted in the final AggregateException wrapping the AggregateException returned from the antecedent Task. This way you have reduced the levels of exceptions. Last, you have ensured the continuation runs on the same thread as the antecedent Task completed on for efficiency.

Alternative WhenAll, WhenAllOrFail

The method Task.WhenAll returns a Task that is deemed to have completed when all the supplied Tasks have completed. As mentioned previously, “completed” could mean Faulted, Cancelled, or RanToCompletion. What if you want to stop waiting if any of the Tasks fail? The Task.WhenAll method will keep you unaware of the outcome of all the Tasks until the final one has completed. Listing 8-16 shows an implementation of WhenAllOrFail; the Task returned from this method will be signaled as complete when all the Tasks have RanToCompletion or any of them end in a Faulted or Cancelled state.

Listing 8-16.  WhenAllOrFail

public static Task<T[]> WhenAllOrFail<T>(IEnumerable<Task<T>> tasks)
{
   List<Task<T>> allTasks = tasks.ToList();
   if ( allTasks.Count == 0) throw new ArgumentException("No tasks to wait on");
 
   var tcs = new TaskCompletionSource<T[]>();
 
   int tasksCompletedCount = 0;
   Action<Task<T>> completedAction = t =>
   {
    if (t.IsFaulted)
    {
      tcs.TrySetException(t.Exception);
      return;
    }
    if (t.IsCanceled)
    {
      tcs.TrySetCanceled();
      return;
    }
    if (Interlocked.Increment(ref tasksCompletedCount) == allTasks.Count)
    {
      tcs.SetResult(allTasks.Select(ct => ct.Result).ToArray());
    }
   };
 
   allTasks.ForEach(t => t.ContinueWith(completedAction));
 
   return tcs.Task;
}

You may have noticed the use of TrySetXX as opposed to just SetXX. The outcome of a Task can only be set once; if you attempt to call SetXX many times against a TaskCompletionSource, this will result in an InvalidOperationException. When calling SetResult you can be confident that no SetXX method has previously been called by virtue of the taskCompletedCount variable. The same cannot be said when wishing to complete early due to Task exception or cancellation. Rather than use further synchronization, you can simply use the TrySetXX methods, which will silently ignore the set if the Task is already in a completed state.

Summary

In this chapter, you have seen that virtually anything can be represented as a Task.TaskCompletionSource<T> lets you build adapters for old-style asynchronous operations, allowing old-style APIs to be integrated easily into the new TPL-based programming model. In addition to adapting conventional asynchronous operations to TPL, you have seen how to create synchronization primitives as tasks, dispensing with the need for threads to block and instead simply yielding the thread to perform other, more useful Tasks. We have just touched on a couple of examples of TaskCompletionSource in this chapter; we hope it has given you an appetite to create some of your own.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.97.187