20. Programming the Task-Based Asynchronous Pattern

Begin 4.0
Begin 5.0

As we saw in Chapter 19, tasks provide an abstraction for the manipulation of asynchronous work. Tasks are automatically scheduled to the right number of threads, and large tasks can be composed by chaining together small tasks, just as large programs can be composed from multiple small methods.

However, there are some drawbacks to tasks. The principal difficulty with tasks is that they turn your program logic “inside out.” To illustrate this, we begin the chapter with a synchronous method that is blocked on an I/O-bound, high-latency operation—a web request. We then revise this method by leveraging the C# 5.0–introduced async/await contextual keywords, demonstrating a significant simplification in authoring and readability of asynchronous code.

A mind map depicts the topics discussed under programming the task-based asynchronous pattern. This includes introducing Asnc/Await, high-latency operations with TPL, async method return types, asynchronous streams, IAsyncDisposable, Async LINQ, and Async return types.

We finish the chapter with a look at asynchronous streams—a C# 8.0–introduced feature for defining and leveraging asynchronous iterators.

Synchronously Invoking a High-Latency Operation

In Listing 20.1, the code uses a WebClient to download a web page and search for the number of times some text appears. Output 20.1 shows the results.

5.0
4.0

Listing 20.1: A Synchronous Web Request

using System;
using System.IO;
using System.Net;

public class Program
{
  public const string DefaultUrl =
      "https://IntelliTect.com";

  public static void Main(string[] args)
  {

      if (args.Length == 0)
      {

          Console.WriteLine("ERROR: No findText argument specified.");
          return;
      }

      string findText = args[0];

      string url = DefaultUrl;
      if (args.Length > 1)
      {

          url = args[1];
          // Ignore additional parameters

      }
      Console.Write(
          $"Searching for '{findText}' at URL '{url}'.");

      Console.WriteLine("Downloading....");
      using WebClient webClient = new WebClient();
      byte[] downloadData =
          webClient.DownloadData(url);

      Console.WriteLine("Searching....");
      int textOccurrenceCount = CountOccurrences(
          downloadData, findText);

      Console.WriteLine(
          @$"{Environment.NewLine}'{findText}' appears {
              textOccurrenceCount} times at URL '{url}'.");
  }

  private static int CountOccurrences(byte[] downloadData, string findText)
  {
      int textOccurrenceCount = 0;

      using MemoryStream stream = new MemoryStream(downloadData);

      using StreamReader reader = new StreamReader(stream);
      int findIndex = 0;
      int length = 0;
      do
      {

          char[] data = new char[reader.BaseStream.Length];
          length = reader.Read(data);
          for (int i = 0; i < length; i++)
          {

              if (findText[findIndex] == data[i])
              {

                  findIndex++;
                  if (findIndex == findText.Length)
                  {

                      // Text was found
                      textOccurrenceCount++;
                      findIndex = 0;

                  }
              }
              else
              {
                  findIndex = 0;

              }
          }
      }
      while (length != 0);

      return textOccurrenceCount;
  }
}
5.0
4.0

Output 20.1

Searching for 'IntelliTect'...
http://www.IntelliTect.com
Downloading...
Searching...
'IntelliTect' appears 35 times at URL 'http://www.IntelliTect.com'.

The logic in Listing 20.1 is relatively straightforward—using common C# idioms. After determining the url and findText values, Main() invokes CountOccurrences(), instantiates a WebClient, and invokes the synchronous method DownloadData() to download the content. Given the downloaded data, it passes this data to CountOccurrences(), which loads it into a MemoryStream and leverages a StreamReader’s Read() method to retrieve a block of data and search it for the findText value. (We use the DownloadData() method rather than the simpler DownloadString() method so that we can demonstrate an additional asynchronous invocation when reading from a stream in Listings 20.2 and 20.3.)

The problem with this approach is, of course, that the calling thread is blocked until the I/O operation completes; this is wasting a thread that could be doing useful work while the operation executes. For this reason, we cannot, for example, execute any other code, such as code that asynchronously indicates progress. In other words, “Download…” and “Searching…” are invoked before the corresponding operation, not during the operation. While doing so would be irrelevant here, imagine that we wanted to concurrently execute additional work or, at a minimum, provide an animated busy indicator.

5.0

Asynchronously Invoking a High-Latency Operation Using the TPL

To address this problem of executing other work in parallel, Listing 20.2 takes a similar approach but instead uses task-based asynchrony with the TPL.

4.0

Listing 20.2: An Asynchronous Web Request

using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using System.Runtime.ExceptionServices;

public class Program
{
  public const string DefaultUrl =
      "https://IntelliTect.com";

  public static void Main(string[] args)
  {
      if (args.Length == 0)
      {

          Console.WriteLine("ERROR: No findText argument specified.");
          return;
      }
      string findText = args[0];

      string url = DefaultUrl;
      if (args.Length > 1)
      {
          url = args[1];
          // Ignore additional parameters
      }
      Console.Write(
          $"Searching for '{findText}' at URL '{url}'.");

      using WebClient webClient = new WebClient();
      Console.Write("
Downloading...");
      Task task = webClient.DownloadDataTaskAsync(url)
          .ContinueWith(antecedent =>
          {
              byte[] downloadData = antecedent.Result;
              Console.Write("
Searching...");
              return CountOccurrencesAsync(
                  downloadData, findText);
          })
          .Unwrap()
          .ContinueWith(antecedent =>
          {
              int textOccurrenceCount = antecedent.Result;
              Console.WriteLine(
                  @$"{Environment.NewLine}'{findText}' appears {
                      textOccurrenceCount} times at URL '{url}'.");
          });
      try
      {
          while(!task.Wait(100))
          {
              Console.Write(".");
          }
      }
      catch(AggregateException exception)
      {
          exception = exception.Flatten();
          try
          {
              exception.Handle(innerException =>
              {
                  // Rethrowing rather than using
                  // if condition on the type
                  ExceptionDispatchInfo.Capture(
                      innerException)
                      .Throw();
                  return true;
              });
          }
          catch(WebException)
          {
              // ...
          }
          catch(IOException )
          {
              // ...
          }
          catch(NotSupportedException )
          {
              // ...
          }
      }
  }

  private static Task<int> CountOccurrencesAsync(
        byte[] downloadData, string findText)
  {
      // ...
  }
}
5.0
4.0

The associated output is essentially the same as Output 20.1, except that additional periods are expected following “Downloading…” and “Searching…”, depending on how long those operations take to execute.

5.0

When Listing 20.2 executes, it prints “Downloading…” with additional periods to the console while the page is downloading; much the same occurs with “Searching…”. The result is that instead of simply printing four periods (....) to the console, Listing 20.2 is able to continuously print periods for as long as it takes to download the file and search its text.

Unfortunately, this asynchrony comes at the cost of complexity. Interspersed throughout the code is TPL-related code that interrupts the flow. Rather than simply following the WebClient.DownloadDataTaskAsync(url) call with statements that count the occurrences (invocation of an asynchronous version of CountOccurrences()), the asynchronous version of the code requires ContinueWith() statements, Unwrap() invocations for simplicity, and a complicated try/catch handling system. The details are outlined in Advanced Topic: The Complexity of Asynchronous Requests with TPL. Suffice it to say, you will be grateful for C# 5.0’s introduction of the task-based asynchronous pattern with async/await.

4.0
5.0
4.0
Begin 7.0

The Task-Based Asynchronous Pattern with async and await

To address the complexity problem, Listing 20.3 also provides asynchronous execution but instead uses task-based asynchrony, which leverages an async/await feature introduced in C# 5.0. With async/await, the compiler takes care of the complexity, allowing the developer to focus on the business logic. Rather than worrying about chaining ContinueWith() statements, retrieving antecedents.Results, Unwrap() invocations, complex error handling, and the like, async/await allows you to just decorate the code with simple syntax that informs the compiler that it should handle the complexity. In addition, when a task completes and additional code needs to execute, the compiler automatically takes care of invoking the remaining code on the appropriate thread. You cannot have two different threads interacting with a single threaded UI platform, for example, and async/await addresses this issue. (See the section on the use of the async/await feature with the Windows UI later in the chapter.)

In other words, the async/await syntax tells the compiler to reorganize the code at compile time, even though it is written relatively simply, and to address the myriad of complexities that developers would otherwise have to consider explicitly—see Listing 20.3.

7.0
5.0
4.0

Listing 20.3: Asynchronous High-Latency Invocation with the Task-Based Asynchronous Pattern

using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;

public class Program
{
  public const string DefaultUrl =
      "https://IntelliTect.com";

  public static async Task Main(string[] args)
  {
      if (args.Length == 0)
      {
          Console.WriteLine("ERROR: No findText argument specified.");
          return;
      }
      string findText = args[0];
      string url = DefaultUrl;
      if (args.Length > 1)
      {
          url = args[1];
          // Ignore additional parameters
      }
      Console.Write(
          $"Searching for '{findText}' at URL '{url}'.");

      using WebClient webClient = new WebClient();
      Task<byte[]> taskDownload =
          webClient.DownloadDataTaskAsync(url);

      Console.WriteLine("Downloading...");
      byte[] downloadData = await taskDownload;
      Task<int> taskSearch = CountOccurrencesAsync(
          downloadData, findText);

      Console.WriteLine("Searching...");
      int textOccurrenceCount = await taskSearch;

      Console.WriteLine(
          @$"{Environment.NewLine}'{findText}' appears {
              textOccurrenceCount} times at URL '{url}'.");
  }

  private static async Task<int> CountOccurrencesAsync(
      byte[] downloadData, string findText)
  {
      int textOccurrenceCount = 0;

      using MemoryStream stream = new MemoryStream(downloadData);
      using StreamReader reader = new StreamReader(stream);

      int findIndex = 0;
      int length = 0;
      do
      {

          char[] data = new char[reader.BaseStream.Length];
          length = await reader.ReadAsync(data);
          for (int i = 0; i < length; i++)
          {

              if (findText[findIndex] == data[i])
              {
                  findIndex++;
                  if (findIndex == findText.Length)
                  {
                      // Text was found
                      textOccurrenceCount++;
                      findIndex = 0;

                  }
              }
              else
              {
                  findIndex = 0;
              }
          }
      }
      while (length != 0);
      return textOccurrenceCount;
  }
}
7.0
5.0
4.0

Notice there are relatively small differences between Listing 20.1 and Listing 20.3. Furthermore, while the number of periods will vary between executions, the output should match that from Listing 20.2. This is one of the key points about the async/await pattern: It lets you write code with only minor modifications from the way you would write it synchronously.

Begin 8.0

To understand the pattern, focus first on the CountOccurrencesAsync() method and the differences from Listing 20.1. First, we change the signature of the CountOccurrences() method declaration by adding the new contextual async keyword as a modifier on the signature. Next, we return a Task<int> rather than just int. Any method decorated with the async keyword must return a valid async return type. This can be a void, Task, Task<T>, ValueTask<T> (as of C# 7.0), or IAsyncEnumerable<T>/IAsyncEnumerator<T> (as of C# 8.0).1 In this case, since the body of the method does not return any data but we still want the ability to return information about the asynchronous activity to the caller, CountOccurrencesAsync() returns Task. By returning a task, the caller has access to the state of the asynchronous invocation and the result (the int) when it completes. Next, the method name is suffixed with “Async” by convention, to indicate that it can be invoked with an await method. Finally, everywhere that we need to asynchronously wait for a task of an asynchronous method within CountOccurrencesAsync(), we include the await operator. In this case, this occurs only for the invocation of reader.ReadAsync(). Like CountOccurrencesAsync(), StreamReader.ReadAsync() is an async method with the same characteristics.

7.0

1. Technically, you can also return any type that implements a GetAwaiter() method. See Advanced Topic: Valid Async Return Types Expanded later in the chapter.

End 8.0
5.0

Back in Main(), the same sorts of differences occur. When we invoke the new CountOccurencesAsync() method, we do so with the await contextual keyword. In so doing, we can write code that is neutered of the explicit task complexities. When we add the await method, we can assign the result to an int (rather than a Task<int>) or assume there is no return if the invoked method returns a Task.

4.0

For emphasis, remember that the signature for CountOccurrencesAsync() is as follows:

private static async Task<int> CountOccurrencesAsync(
  byte[] downloadData, string findText)

Even though it returns a Task<int>, the await invocation of CountOccurrencesAsync() would return an int:

int textOccurrenceCount = await CountOccurrencesAsync(
  downloadData, findText);

This example shows some of the magic of the await feature: It unwraps the result from task and returns it directly.

If you wish to execute code while the asynchronous operation is executing, you can postpone the await invocation until the parallel work (writing to the console) has completed. The code prior to invoking CountOccurrencesAsync() invokes webClient.DownloadDataTaskAsync(url) similarly. Rather than assigning a byte[] and leveraging the await operator, the await invocation is also postponed while periods are written to the console in parallel.

using WebClient webClient = new WebClient();
Task<byte[]> taskDownload =
  webClient.DownloadDataTaskAsync(url);
while (!taskSearch.Wait(100)){Console.Write(".");}
byte[] downloadData = await taskDownload;

In addition to unwrapping the Task, the await operator will tell the C# compiler to generate code for ensuring the code following the await invocation will execute on the appropriate thread. This is a critical benefit that avoids what can be challenging defects to find.

End 7.0

To better explain the control flow, Figure 20.1 shows each task in a separate column along with the execution that occurs on each task.

There are a couple of important misconceptions that Figure 20.1 helps to dismiss:

5.0
4.0
  • Misconception #1: A method decorated with the async keyword is automatically executed on a worker thread when called. This is absolutely not true; the method is executed normally, on the calling thread, and if the implementation doesn’t await any incomplete awaitable tasks, it will complete synchronously on the same thread. The method’s implementation is responsible for starting any asynchronous work. Just using the async keyword does not change on which thread the method’s code executes. Also, there is nothing unusual about a call to an async method from the caller’s perspective; it is a method typed as returning one of the valid async return types and it is called normally. In Main(), for example, the return from CountOccurrencesAsync() is assigned a Task<int>, just like it would for a non-async method. We then await the task.

  • Misconception #2: The await keyword causes the current thread to block until the awaited task is completed. That is also absolutely not true. If you have no other choice except for the current thread to block until the task completes, call the Wait() method (while being careful to avoid a deadlock), as we have described in Chapter 19. The await keyword evaluates the expression that follows it, a valid async return type such as Task, Task<T>, or ValueTask<T>; adds a continuation to the resultant task; and then immediately returns control to the caller. The creation of the task has started asynchronous work; the await keyword means that the developer wishes the caller of this method to continue executing its work on this thread while the asynchronous work is processed. At some point after that asynchronous work is complete, execution will resume at the point of control following the await expression.

A control flow diagram involving main method, download data task async task, and read async tasks is shown.

Figure 20.1: Control flow within each task

5.0
4.0

In fact, the principal reasons why the async keyword exists in the first place are twofold. First, it makes it crystal clear to the reader of the code that the compiler will automatically rewrite the method that follows. Second, it informs the compiler that usages of the await contextual keyword in the method are to be treated as asynchronous control flow and not as an ordinary identifier.

5.0
Begin 7.1

Starting with C# 7.1, it is possible to have an async Main method. As a result, Listing 20.3’s Main signature is public static async Task Main(string[] args). This allows the user of the await operator to invoke the asynchronous methods. Without an async Main() method, we would have to rely on working with valid async return types explicitly, along with explicitly waiting for the task completion before exiting the program, to avoid unexpected behavior.

Parenthetically, both C# 5.0 and 6.0 included a restriction that await operators couldn’t appear within exception handling catch or finally statements. However, this restriction was removed starting with C# 7.0. This is a helpful improvement when you consider that you likely might want to log the exception from the outermost exception handler in the call stack—logging is a relatively expensive operation, such that doing so with an asynchronous await is desirable.

4.0

Introducing Asynchronous Return of ValueTask<T>

We use asynchronous methods for long-running, high-latency operations. And (obviously), since Task/Task<T> is the return, we always need to obtain an instance of one of these objects to return. The alternative, to return null, would force callers to always check for null before accessing the Task—an unreasonable and frustrating API from a usability perspective. Generally, the cost to create a Task/Task<T> is insignificant in comparison to the long-running, high-latency operation.

7.1

What happens, though, if the operation can be short-circuited and a result returned immediately? Consider, for example, compressing a buffer. If the amount of data is significant, performing the operation asynchronously makes sense. If, however, data is zero-length, then the operation can return immediately, and obtaining a (cached or new instance of) Task/Task<T> is pointless because there is no need for a task when the operation completes immediately. What is needed is a task-like object that can manage the asynchrony, but not require the expense of a full Task/Task<T> when it isn’t needed. While there was no such alternative when async/await was introduced in C# 5.0, C# 7.0 added support for additional valid async return types—that is, types that support a GetAwaiter() method, as detailed in Advanced Topic: Valid Async Return Types Expanded.

5.0

For example, C# 7.0–related .NET frameworks include ValueTask<T>, a value type that scales down to support lightweight instantiation when a long-running operation can be short-circuited or that can be converted to a full Task otherwise. Listing 20.4 provides an example of file compression but escaping via ValueTask<T> if the compression can be short-circuited.

4.0

Listing 20.4: Returning ValueTask<T> from an async Method

using System.IO;
using System.Text;
using System.Threading.Tasks;

public static class Program
{
      private static async ValueTask<byte[]> CompressAsync(byte[] buffer)
      {
          if (buffer.Length == 0)
          {
              return buffer;
          }
          using MemoryStream memoryStream = new MemoryStream();
          using System.IO.Compression.GZipStream gZipStream =
              new System.IO.Compression.GZipStream(
                  memoryStream,
                     System.IO.Compression.CompressionMode.Compress);
          await gZipStream.WriteAsync(buffer, 0, buffer.Length);
          return memoryStream.ToArray();
      }
  // ...
}
7.1

Notice that even though an asynchronous method, such as GZipStream.WriteAsync(), might return Task<T>, the await implementation still works within a ValueTask<T> returning method. In Listing 20.4, for example, changing the return from ValueTask<T> to Task<T> involves no other code changes.

5.0

The availability of ValueTask<T> raises the question of when to use it versus Task/Task<T>. If your operation doesn’t return a value, just use Task (there is no nongeneric ValueTask<T> because it has no benefit). Likewise, if your operation is likely to complete asynchronously, or if it’s not possible to cache tasks for common result values, Task<T> is preferred. For example, there’s generally no benefit to returning ValueTask<bool> instead of Task<bool>, because you can easily cache a Task<bool> for both true and false values—and in fact, the async infrastructure does this automatically. In other words, when returning an asynchronous Task<bool> method that completes synchronously, a cached result Task<bool> will return regardless. If, however, the operation is likely to complete synchronously and you can’t reasonably cache all common return values, ValueTask<T> might be appropriate.

4.0
5.0

Asynchronous Streams

C# 8.0 introduced the ability to program async streams, essentially the ability to leverage the async pattern with iterators. As discussed in Chapter 15, collections in C# are all built on the IEnumerable<T> and IEnumerator<T>, the former with a single GetEnumerator<T>() function that returns an IEnumerator<T> over which you can iterate. And, when building an iterator with yield return, the method needs to return IEnumerable<T> or IEnumerator<T>. In contrast, valid async return types must support a GetAwaiter() method,2 just as Task, Task<T>, and ValueTask<T> do. The conflict, therefore, is that you can’t have both an async method and an iterator. When invoking an async method while iterating over a collection, for example, you can’t yield the results to a calling function prior to the completion of all expected iterations.

2. Or void.

4.0

To address these problems, the C# team added asynchronous streams (async streams) support in C# 8.0. This feature is specifically designed to enable asynchronous iteration and the building of asynchronous collections and enumerable type methods using yield return.

For example, imagine encrypting content with an async method, EncryptFilesAsync(), given a directory (defaulting to the current directory). Listing 20.6 provides the code.

Listing 20.6: Async Streams

using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Threading;
using System.Runtime.CompilerServices;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

public static class Program
{
  public static async void Main(params string[] args)
  {
      string directoryPath = Directory.GetCurrentDirectory();
      const string searchPattern = "*";

  // ...

      using Cryptographer cryptographer = new Cryptographer();

      IEnumerable<string> files = Directory.EnumerateFiles(
          directoryPath, searchPattern);

      // Create a cancellation token source to cancel
      // if the operation takes more than a minute.
      using CancellationTokenSource cancellationTokenSource =
          new CancellationTokenSource(1000*60);

      await foreach ((string fileName, string encryptedFileName)               
          in EncryptFilesAsync(files, cryptographer)                          
          .Zip(files.ToAsyncEnumerable())                                     
          .WithCancellation(cancellationTokenSource.Token))                   
      {
          Console.WriteLine($"{fileName}=>{encryptedFileName}");
      }

  }

  public static async IAsyncEnumerable<string> EncryptFilesAsync(               
      IEnumerable<string> files, Cryptographer cryptographer,                   
      [EnumeratorCancellation] CancellationToken cancellationToken = default)  
  {
      foreach (string fileName in files)
      {
          yield return await EncryptFileAsync(fileName, cryptographer);        
          cancellationToken.ThrowIfCancellationRequested();
      }
  }
  private static async Task<string> EncryptFileAsync(                     
      string fileName, Cryptographer cryptographer)                              

  {

      string encryptedFileName = $"{fileName}.encrypt";
      await using FileStream outputFileStream =                                    
          new FileStream(encryptedFileName, FileMode.Create);                      

      string data = await File.ReadAllTextAsync(fileName);                         
      await cryptographer.EncryptAsync(data, outputFileStream);                    
      return encryptedFileName;
  }
}
8.0
5.0
4.0
8.0

Listing 20.6 begins with a Main() method, inside of which there is a C# 8.0–introduced async foreach statement iterating over an asynchronous method, EncryptFilesAsync(). (We will address the WithCancellation() invocation shortly.) The EncryptFilesAsync() method iterates over each of the specified files with a foreach loop. Inside the foreach loop are two async method invocations. The first is a call to File.ReadAllTextAsync(), which reads in all the content of the file. Once the content is available in memory, the code invokes the EncryptAsync() method to encrypt it before returning the encrypted filename via a yield return statement. Thus, the method provides an example of the need to provide an asynchronous iterator to the caller. The key to making this possible is the EncryptFilesAsync()’s decoration with async and its return of IAsyncEnumerable<T> (where T is a string in this case).

5.0

Given a method returning IAsyncEnumerable<T>, you can consume it using an await foreach statement as demonstrated in the Main method of Listing 20.6. Thus, this listing both producing and consuming an async stream.

The signature for GetAsyncEnumerator() includes a CancellationToken parameter. Because the await foreach loop generates the code that calls GetAsyncEnumerator(), the way to inject a cancellation token and provide cancellation is via the WithCancellation() extension method (as Figure 20.2 shows, there’s no WithCancellation() method on IAsyncEnumerable<T> directly). To support cancellation in an async stream method, add an optional CancellationToken with an EnumeratorCancellationAttribute as demonstrated by the EncryptFilesAsunc method declaration:

4.0
static public async IAsyncEnumerable<string>
    EncryptFilesAsync(
        string directoryPath = null,
        string searchPattern = "*",
        [EnumeratorCancellation] CancellationToken
            cancellationToken = default)
{ ... }
The IAsync Enumerable class diagram is depicted.

Figure 20.2: IAsycnEnumerable<T> and related interfaces

8.0
5.0

In Listing 20.6, you provide an async stream method that returns the IAsyncEnumerable<T> interface. As with the non-async iterators, however, you can also implement the IAsyncEnumerable<T> interface with its GetAsyncEnumerator() method. Of course, any class implementing the interface can then be iterated over with an await foreach statement, as shown in Listing 20.7.

4.0

Listing 20.7: Async Streams Invocation with await foreach

class AsyncEncryptionCollection : IAsyncEnumerable<string?>
{
  public async IAsyncEnumerator<string> GetAsyncEnumerator(
      CancellationToken cancellationToken = default)
  {
      // ...
  }

  static public async void Main()
  {
      AsyncEncryptionCollection collection =
          new AsyncEncryptionCollection();
      // ...

      await foreach (string fileName in collection)
      {
          Console.WriteLine(fileName);
      }
  }
}
8.0

One point of caution: Remember that declaring an async method doesn’t automatically cause the execution to run in parallel. Just because the EncryptFilesAsync() method is asynchronous, that doesn’t mean iterating over each file and invoking File.ReadAllTextAsync() and Cryptographer.EncryptAsync() will happen in parallel. To guarantee that behavior, you need to leverage a Task invocation or something like a System.Threading.Tasks.Parallel.ForEach() (see Chapter 21).

5.0

The IAsyncEnumerable<T> interface, along with its partner IAsyncEnumerator<T>, are C# 8.0 additions shown in Figure 20.2 that match their synchronous equivalents. Note that both the IAsyncDisposable.DisposeAsync() and IAsyncEnumerator<T>.MoveNextAsync() methods are asynchronous versions of IEnumerators<T> equivalent methods. The Current property isn’t asynchronous. (Also, there’s no Reset() method in the asynchronous implementations.)

4.0

IAsyncDisposable and the await using Declaration and Statement

IAsyncDisposable is the asynchronous equivalent of IDisposable, so it can be invoked using C# 8.0’s new await using statement or await using declaration. In Listing 20.6, we use the latter when declaring outputFileStream because, like IAsyncEnumerable<T>, FileStream also implements IAsyncDisposable. As with the using declarative, you can’t reassign a variable declared with async using.

Not surprisingly, the await using statement follows the same syntax as the common using statement:

await using FileStream outputFileStream =
    new FileStream(encryptedFileName, FileMode.Create);
{ ... }

Both can be used anytime the type implements IAsyncDisposable or simply has a DisposeAsync() method. The result is that the C# compiler injects a try/finally block around the declaration and before the variable goes out of scope, and then invokes await DisposeAsync() within the finally block.3 This approach ensures that all resources are cleaned up.

3. Using await inside a finally block was added in C# 6.0.

8.0

Note that IAsyncDisposable and IDisposable are not related to each other via inheritance. In consequence, their implementations are not dependent, either: One can be implemented without the other.

Using LINQ with IAsyncEnumerable

In the await foreach statement of Listing 20.6, we invoke the LINQ AsyncEnumberable.Zip() method to pair the original filename with the encrypted filename.

await foreach (
    (string fileName, string encryptedFileName) in
        EncryptFilesAsync(files)
            .Zip(files.ToAsyncEnumerable()))
{
    Console.WriteLine($"{fileName}=>{encryptedFileName}");
}
5.0

AsyncEnumerable provides the LINQ functionality for IAsyncEnumerable<T>, as you might expect. However, the library is not available in the BCL. Instead,4 to access the asynchronous LINQ capabilities, you need to add a reference to the System.Linq.Async NuGet package.

4.0

4. At least at the time of this writing—per .NET Core 3.0 and 3.1.

AsyncEnumerable is defined in System.Linq (not a different unique namespace with async functionality). Not surprisingly, it includes asynchronous versions of the standard LINQ operators such as Where(), Select(), and the Zip() method used in the aforementioned listing. They are considered “asynchronous versions” because they are extension methods on IAsyncEnumerable rather than IEnumerable<T>. In addition, AsyncEnumerable includes a series of *Async(), *AwaitAsync(), and *AwaitWithCancellationAsync() methods. The Select*() versions of each of these methods are shown in Listing 20.8.

Listing 20.8: Signatures of AsyncEnumerable Select*()5 Methods

5. Excluding the SelectMany() category of methods.

namespace System.Linq
{
  public static class AsyncEnumerable
  {
      // ...
      public static IAsyncEnumerable<TResult> Select<TSource?, TResult?>(
          this IAsyncEnumerable<TSource> source,
          Func<TSource, TResult> selector);
      public static IAsyncEnumerable<TResult> SelectAwait<TSource?, TResult?>(
          this IAsyncEnumerable<TSource> source,
          Func<TSource, ValueTask<TResult>>? selector);
      public static
          IAsyncEnumerable<TResult> SelectAwaitWithCancellation<
              TSource?, TResult?>(
                  this IAsyncEnumerable<TSource> source,
                      Func<TSource, CancellationToken,
                          ValueTask<TResult>> selector);
      // ...
  }
}
8.0

The method name that matches the Enumerable equivalent—Select() in this case—has a similar “instance” signature, but the TResult and TSource are different. Both signatures with “Await” in the name take asynchronous signatures that include a selector that returns a ValueTask<T>. For example, you could invoke Listing 20.6’s EncryptFileAsync() method from SelectAwait() as follows:

5.0
IAsyncEnumerable<string> items = files.ToAsyncEnumerable();
items = items.SelectAwait(
  (text, id) => EncryptFileAsync(text));
4.0

The important thing to note is that EncyryptFileAsync() method returns a ValueTask<T>, which is what both *Await() and *AwaitWithCancellationAsync() require. The latter, of course, also allows for specification of a cancellation token.

Another asynchronous LINQ method worthy of mention is the ToAsyncEnumerable() method used in Listing 20.6. Since asynchronous LINQ methods work with IAsyncEnumerable<T> interfaces, ToAsyncEnumerable() takes care of converting IEnumerable<T> to IAsyncEnumerable<T>. Similarly, a ToEnumerable() method makes the opposite conversion. (Admittedly, using files.ToAsyncEnumerable() in the snippet is a contrived example for retrieving an IAsyncEnumerable<string>.)

The scalar versions of the asynchronous LINQ methods similarly match the IEnumerable<T>—with a *Await(), *AwaitAsync(), and *AwaitWithCancellation() set of members. The key difference is that they all return a ValueTask<T>. The following snippet provides an example of using the AverageAsync() method:

double average = await AsyncEnumerable.Range(
  0, 999).AverageAsync();

As such, we can use await to treat the return as a double rather than a ValueTask<double>.

End 8.0

Returning void from an Asynchronous Method

We've already looked at several valid async return types, including Task, Task<T>, ValueTask<T>, and now IAsyncEnumerable<T>, all of which support a GetAwaiter() method. There is one allowable return type (or non-type) that does not support the GetAwaiter(). This return option, which is available for an async method, is void—a method henceforth referred to as an async void method. In most cases, async void methods should be avoided and might more accurately be considered a non-option. Unlike when returning a GetAwaiter() supporting type, when there is a void return it is indeterminate when a method completes executing. If an exception occurs, returning void means there is no such container to report an exception. In such a case, any exception thrown on an async void method is likely to end up on the UI SynchronizationContext—effectively becoming an unhandled exception (see Advanced Topic: Dealing with Unhandled Exceptions on a Thread in Chapter 19).

5.0

If async void methods should be generally avoided, why are they allowed in the first place? It’s because async void methods can be used to enable async event handlers. As discussed in Chapter 14, an event should be declared as an EventHandler<T>, where EventHandler<T> has a signature of the following form:

4.0
void EventHandler<TEventArgs>(object sender, TEventArgs e)

Thus, to fit the convention of an event matching the EventHandler<T> signature, an async event needs to return void. One might suggest changing the convention, but (as discussed in Chapter 14) there could be multiple subscribers, and retrieving the return from multiple subscribers is nonintuitive and cumbersome. For this reason, the guideline is to avoid async void methods unless they are subscribers to an event handler—in which case they should not throw exceptions. Alternatively, you should provide a synchronization context to receive notifications of synchronization events such as the scheduling of work (e.g., Task.Run()) and, perhaps more important, unhandled exceptions. Listing 20.9 and the accompanying Output 20.2 provide an example of how to do this.

5.0
4.0

Listing 20.9: Catching an Exception from an async void Method

using System;
using System.Threading;
using System.Threading.Tasks;

public class AsyncSynchronizationContext : SynchronizationContext
{
  public Exception? Exception { get; set; }
  public ManualResetEventSlim ResetEvent { get;} = new ManualResetEventSlim();

  public override void Send(SendOrPostCallback callback, object? state)
  {

      try
      {
          Console.WriteLine($@"Send notification invoked...(Thread ID: {
              Thread.CurrentThread.ManagedThreadId})");
          callback(state);
      }
      catch (Exception exception)
      {
          Exception = exception;
#if !WithOutUsingResetEvent
          ResetEvent.Set();
#endif
      }
  }

  public override void Post(SendOrPostCallback callback, object? state)
  {
      try
      {
          Console.WriteLine($@"Post notification invoked...(Thread ID: {
              Thread.CurrentThread.ManagedThreadId})");
          callback(state);
      }
      catch (Exception exception)
      {
          Exception = exception;
#if !WithOutUsingResetEvent
          ResetEvent.Set();
#endif
      }
  }

}
public static class Program
{
  static bool EventTriggered { get; set; }

  public const string ExpectedExceptionMessage = "Expected Exception";
  public static void Main()
  {
      SynchronizationContext? originalSynchronizationContext =
          SynchronizationContext.Current;
      try
      {
          AsyncSynchronizationContext synchronizationContext =
              new AsyncSynchronizationContext();
          SynchronizationContext.SetSynchronizationContext(
              synchronizationContex t);

          await OnEvent(typeof(Program), EventArgs.Empty);

#if WithOutUsingResetEvent
          Task.Delay(1000).Wait();  //
#else
          synchronizationContext.ResetEvent.Wait();
#endif

          if(synchronizationContext.Exception != null)
          {
              Console.WriteLine($@"Throwing expected exception....(Thread
 ID: {
              Thread.CurrentThread.ManagedThreadId})");
              System.Runtime.ExceptionServices.ExceptionDispatchInfo.
Capture(
                  synchronizationContext.Exception).Throw();
          }
      }
      catch(Exception exception)
      {
          Console.WriteLine($@"{exception} thrown as expected.(Thread ID: {
              Thread.CurrentThread.ManagedThreadId})");
      }
      finally
      {
          SynchronizationContext.SetSynchronizationContext(
              originalSynchronizationContext);
      }
  }
  private static async void OnEvent(object sender, EventArgs eventArgs)
  {
      Console.WriteLine($@"Invoking Task.Run...(Thread ID: {
              Thread.CurrentThread.ManagedThreadId})");
      await Task.Run(()=>
      {
            {
          EventTriggered = true;
          Console.WriteLine($@"Running task... (Thread ID: {
              Thread.CurrentThread.ManagedThreadId})");
          throw new Exception(ExpectedExceptionMessage);
      });
  }
}
5.0
4.0

Output 20.2

Invoking Task.Run...(Thread ID: 8)
Running task... (Thread ID: 9)
Post notification invoked...(Thread ID: 8)
Post notification invoked...(Thread ID: 8)
Throwing expected exception....(Thread ID: 8)
System.Exception: Expected Exception
  at AddisonWesley.Michaelis.EssentialCSharp.Chapter20.Listing20_09.
Program.Main() in
...Listing20.09.AsyncVoidReturn.cs:line 80 thrown as expected.(Thread ID: 8)
5.0

The code in Listing 20.9 executes procedurally up until the await Task.Run() invocation within OnEvent() starts. Following its completion, control is passed to the Post() method within AsyncSynchronizationContext. After the execution and completion of the Post() invocation, the Console.WriteLine("throw Exception...") method executes, and then an exception is thrown. This exception is captured by the AsyncSynchronizationContext.Post() method and passed back into Main().

4.0

In this example, we use a Task.Delay() call to ensure the program doesn’t end before the Task.Run() invocation. In the real world, as shown in Chapter 22, a ManualResetEventSlim would be the preferred approach.

Begin 8.0
End 8.0
5.0

Asynchronous Lambdas and Local Functions

Just as a lambda expression converted to a delegate can be used as a concise syntax for declaring a normal method, so C# 5.0 (and later) also allows lambdas containing await expressions to be converted to delegates. To do so, just precede the lambda expression with the async keyword. In Listing 20.10, we first assign an async lambda to a Func<string, Task> writeWebRequestSizeAsync variable. We then use the await operator to invoke it.

4.0

Listing 20.10: An Asynchronous Client-Server Interaction as a Lambda Expression

using System;
using System.IO;
using System.Net;
using System.Linq;
using System.Threading.Tasks;

public class Program
{

  public static void Main(string[] args)
  {
      string url = "http://www.IntelliTect.com";
      if(args.Length > 0)
      {
          url = args[0];
      }

      Console.Write(url);

      Func<string, Task> writeWebRequestSizeAsync =
          async (string webRequestUrl) =>
          {
              // Error handling omitted for
              // elucidation
              WebRequest webRequest =
                 WebRequest.Create(url);

              WebResponse response =
                  await webRequest.GetResponseAsync();

              // Explicitly counting rather than invoking
              // webRequest.ContentLength to demonstrate
              // multiple await operators
              using(StreamReader reader =
                  new StreamReader(
                      response.GetResponseStream()))
              {
                  string text =
                      (await reader.ReadToEndAsync());
                  Console.WriteLine(
                      FormatBytes(text.Length));
              }
          };

      Task task = writeWebRequestSizeAsync(url);                      

      while (!task.Wait(100))
      {
          Console.Write(".");
      }
  }

  // ...

}
5.0
4.0
Begin 7.0

Similarly, the same effect can be achieved in C# 7.0 or later with a local function. For example, in Listing 20.10, you could change the lambda expression header (everything up to and including the => operator) to

async Task WriteWebRequestSizeAsync(string webRequestUrl)

leaving everything in the body, including the curly braces, unchanged.

Note that an async lambda expression has the same restrictions as the named async method:

  • An async lambda expression must be converted to a delegate whose return type is a valid async return type.

  • The lambda is rewritten so that return statements become signals that the task returned by the lambda has completed with the given result.

  • Execution within the lambda expression occurs synchronously until the first await on an incomplete awaitable is executed.

  • All instructions following the await will execute as continuations on the return from the invoked asynchronous method (or, if the awaitable is already complete, will be simply executed synchronously rather than as continuations).

  • An async lambda expression can be invoked with an await operator (not shown in Listing 20.10).

End 7.0
5.0
5.0
4.0

Wrapping your head around precisely what is happening in an async method can be difficult, but it is far less difficult than trying to figure out what asynchronous code written with explicit continuations in lambdas is doing. The key points to remember are as follows:

  • When control reaches an await keyword, the expression that follows it produces a task.6 Control then returns to the caller so that it can continue to do work while the task completes asynchronously (assuming it hadn’t already completed).

    6. Technically, it is an awaitable type, as described in the Advanced Topic: Valid Async Return Types Expanded.”

  • Sometime after the task completes, control resumes at the point following the await. If the awaited task produces a result, that result is then obtained. If it faulted, the exception is thrown.

  • A return statement in an async method causes the task associated with the method invocation to become completed; if the return statement has a value, the value returned becomes the result of the task.

5.0

Task Schedulers and the Synchronization Context

On occasion, this chapter has mentioned the task scheduler and its role in determining how to assign work to threads efficiently. Programmatically, the task scheduler is an instance of the System.Threading.Tasks.TaskScheduler. This class, by default, uses the thread pool to schedule tasks appropriately, determining how to safely and efficiently execute them—when to reuse them, dispose them, or create additional ones.

It is possible to create your own task scheduler that makes different choices about how to schedule tasks by deriving a new type from the TaskScheduler class. You can obtain a TaskScheduler that will schedule a task to the current thread (or, more precisely, to the synchronization context associated with the current thread), rather than to a different worker thread, by using the static FromCurrentSynchronizationContext() method.7

4.0

7. For an example, see Listing C.8 in Multithreading Patterns Prior to C# 5.0, available at https://IntelliTect.com/EssentialCSharp5.

The synchronization context under which a task executes and, in turn, the continuation task(s) execute(s), is important because the awaiting task consults the synchronization context (assuming there is one) so that a task can execute efficiently and safely. Listing 20.13 (along with Output 20.3) is similar to Listing 19.3, except that it also prints out the thread ID when it displays the message.

5.0

Listing 20.13: Calling Task.ContinueWith()

using System;
using System.Threading;
using System.Threading.Tasks;

public class Program
{
  public static void Main()
  {
      DisplayStatus("Before");
      Task taskA =
          Task.Run(() =>
               DisplayStatus("Starting..."))
          .ContinueWith( antecedent =>
               DisplayStatus("Continuing A..."));
      Task taskB = taskA.ContinueWith( antecedent =>
          DisplayStatus("Continuing B..."));
      Task taskC = taskA.ContinueWith( antecedent =>
          DisplayStatus("Continuing C..."));
      Task.WaitAll(taskB, taskC);
      DisplayStatus("Finished!");
  }

  private static void DisplayStatus(string message)
  {
      string text = string.Format(
              $@"{ Thread.CurrentThread.ManagedThreadId
                  }: { message }");
      Console.WriteLine(text);
  }
}
4.0

Output 20.3

1: Before
3: Starting...
4: Continuing A...
3: Continuing C...
4: Continuing B...
1: Finished!

What is noteworthy about this output is that the thread ID changes sometimes and gets repeated at other times. In this kind of plain console application, the synchronization context (accessible from SynchronizationContext.Current) is null—the default synchronization context causes the thread pool to handle thread allocation instead. This explains why the thread ID changes between tasks: Sometimes the thread pool determines that it is more efficient to use a new thread, and sometimes it decides that the best course of action is to reuse an existing thread.

Fortunately, the synchronization context gets set automatically for types of applications where that is critical. For example, if the code creating tasks is running in a thread created by ASP.NET, the thread will have a synchronization context of type AspNetSynchronizationContext associated with it. In contrast, if your code is running in a thread created in a Windows UI application—namely, Windows Presentation Foundation (WPF) or Windows Forms—the thread will have an instance of DispatcherSynchronizationContext or WindowsFormsSynchronizationContext, respectively. (For console applications and Windows Services, the thread will have an instance of the default SynchronizationContext.) Since the TPL consults the synchronization context, and that synchronization context varies depending on the circumstances of the execution, the TPL is able to schedule continuations executing in contexts that are both efficient and safe.

5.0

To modify the code so that the synchronization context is leveraged instead, you must (1) set the synchronization context and (2) use async/await to ensure that the synchronization context is consulted.8

8. For a simple example of how to set the synchronization context of a thread and how to use a task scheduler to schedule a task to that thread, see Listing C.8 in Multithreading Patterns Prior to C# 5.0, available at https://IntelliTect.com/EssentialCSharp.

4.0

It is possible to define custom synchronization contexts and to work with existing synchronization contexts to improve their performance in some specific scenarios. However, describing how to do so is beyond the scope of this text.

async/await with the Windows UI

One place where synchronization is especially important is in the UI context. With the Windows UI, for example, a message pump processes messages such as mouse click and move events. Furthermore, the UI is single-threaded, so that interaction with any UI components (e.g., a text box) must always occur from the single UI thread. One of the key advantages of the async/await pattern is that it leverages the synchronization context to ensure that continuation work—work that appears after the await statement—will always execute on the same synchronization task that invoked the await statement. This approach is of significant value because it eliminates the need to explicitly switch back to the UI thread to update a control.

To better appreciate this benefit, consider the example of a UI event for a button click in WPF, as shown in Listing 20.14.

5.0

Listing 20.14: Synchronous High-Latency Invocation in WPF

using System;
private void PingButton_Click(
  object sender, RoutedEventArgs e)
{
  StatusLabel.Content = "Pinging...";
  UpdateLayout();
  Ping ping = new Ping();
  PingReply pingReply =
      ping.Send("www.IntelliTect.com");
  StatusLabel.Text = pingReply.Status.ToString();
}
4.0

Given that StatusLabel is a WPF System.Windows.Controls.TextBlock control and we have updated the Content property twice within the PingButton_Click() event subscriber, it would be a reasonable assumption that first “Pinging…” would be displayed until Ping.Send() returned, and then the label would be updated with the status of the Send() reply. As those experienced with Windows UI frameworks well know, this is not, in fact, what happens. Rather, a message is posted to the Windows message pump to update the content with “Pinging…,” but because the UI thread is busy executing the PingButton_Click() method, the Windows message pump is not processed. By the time the UI thread is freed and can look at the Windows message pump, a second Text property update request has been queued and the only message that the user is able to observe is the final status.

To fix this problem using TAP, we change the code highlighted in Listing 20.15.

Listing 20.15: Synchronous High-Latency Invocation in WPF Using await

using System;
async private void PingButton_Click(
  object sender, RoutedEventArgs e)
{
  StatusLabel.Content = "Pinging...";
  UpdateLayout();
  Ping ping = new Ping();
  PingReply pingReply =
      await ping.SendPingAsync("www.IntelliTect.com");                            
  StatusLabel.Text = pingReply.Status.ToString();
}
5.0

This change offers two advantages. First, the asynchronous nature of the ping call frees up the caller thread to return to the Windows message pump caller’s synchronization context, and it processes the update to StatusLabel.Content so that “Pinging…” appears to the user. Second, when awaiting the completion of ping.SendTaskAsync(), it will always execute on the same synchronization context as the caller. Also, because the synchronization context is specifically appropriate for the Windows UI, it is single-threaded; thus, the return will always be to the same thread—the UI thread. In other words, rather than immediately executing the continuation task, the TPL consults the synchronization context, which instead posts a message regarding the continuation work to the message pump. Next, because the UI thread monitors the message pump, upon picking up the continuation work message, it invokes the code following the await call. (As a result, the invocation of the continuation code is on the same thread as the caller that processed the message pump.)

4.0

A key code readability feature is built into the TAP language pattern. Notice in Listing 20.15 that the call to return pingReply.Status appears to flow naturally after the await statement, providing a clear indication that it will execute immediately following the previous line. However, writing what really happens from scratch would be far less understandable for multiple reasons.

4.0
Begin 8.0

Summary

The bulk of this chapter focused on the C# 5.0–introduced task-based asynchronous pattern, which features the async/await syntax. It provided detailed examples demonstrating how much simpler it is to leverage TAP rather than to solely rely on TPL, especially when transforming code from a synchronous to an asynchronous implementation. In so doing, it detailed the requirements for the return type of an async method. In summary, the async/await feature makes programming complex workflows with Task objects much easier by automatically rewriting your programs to manage the continuation “wiring” that composes larger tasks out of smaller tasks.

Next, the chapter focused on async streams and the C# 8.0–introduced IAsyncEnumerable<T> data type. It considered how to leverage these capabilities to create asynchronous iterators and how to consume them with async foreach statements.

End 8.0
End 5.0
End 4.0

At this point, readers should have a firm foundation regarding how to write asynchronous code, except for parallel iterations (the topic of Chapter 21) and thread synchronization (which is covered in Chapter 22).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.175.243