21. Iterating in Parallel

In Chapter 19, we mentioned how the cost of computation is falling, resulting in computers with faster CPUs, an increased number of CPUs, and an increased number of cores in each CPU. Collectively, these trends are making it increasingly more economical to boost the execution parallelization to take advantage of the increased computing power. In this chapter, we look at executing loops in parallel, which is one of the easiest ways to take advantage of the increased computing capability. Much of the chapter consists of Beginner and Advanced Topic blocks.

A mind map depicts the various concepts of iterations in parallel. This includes the following: (1) Parallel loops, (2) Exception handling, (3) Iteration cancellation, and (4) Parallel LINQ.
Begin 4.0

Executing Loop Iterations in Parallel

Consider the for loop statement and associated code shown in Listing 21.1 and the corresponding results in Output 21.1. The listing calls a method for calculating a section of the decimal expansion of pi, where the parameters are the number of digits and the digit to start with. The actual calculation is not germane to the discussion. What is interesting about this calculation is that it is embarrassingly parallelizable; that is, it is remarkably easy to split up a large task—say, computing 1 million decimal digits of pi—into any desired number of smaller tasks that can all be run in parallel. These types of computations are the easiest ones to speed up by adding parallelism.

Listing 21.1: A for Loop Synchronously Calculating Pi in Sections

using System;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

class Program
{
  const int TotalDigits = 100;
  const int BatchSize = 10;

  static void Main()
  {
      string pi = "";
      const int iterations = TotalDigits / BatchSize;
      for(int i = 0; i < iterations; i++)
      {
          pi += PiCalculator.Calculate(
              BatchSize, i * BatchSize);
      }

      Console.WriteLine(pi);
  }
}
using System;

class PiCalculator
{
  public static string Calculate(
      int digits, int startingAt)
  {
     // ...
  }

   // ...
}
4.0

Output 21.1

>3.14159265358979323846264338327950288419716939937510582097494459230781640
62862089986280348253421170679821480865132823066470938446095505822317253594
08128481117450284102701938521105559644622948954930381964428810975665933446
12847564823378678316527120190914564856692346034861045432664821339360726024
91412737245870066063155881748815209209628292540917153643678925903600113305
30548820466521384146951941511609433057270365759591953092186117381932611793
10511854807446237996274956735188575272489122793818301194912

The for loop executes each iteration synchronously and sequentially. However, because the pi calculation algorithm splits the pi calculation into independent pieces, it is not necessary to compute the pieces sequentially providing the results are appended in the right order. Imagine what would happen if you could have all the iterations of this loop run concurrently: Each processor could take a single iteration and execute it in parallel with other processors executing other iterations. Given the simultaneous execution of iterations, we could decrease the execution time more and more based on the number of processors.

The Task Parallel Library (TPL) provides a convenient method, Parallel .For(), which does precisely that. Listing 21.2 shows how to modify the sequential, single-threaded program in Listing 21.1 to use the helper method.

Listing 21.2: For Loop Calculating Pi in Sections in Parallel

using System;
using System.Threading.Tasks;                                                       
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

// ...

class Program
{
  static void Main()
  {
        string pi = "";
        const int iterations = TotalDigits / BatchSize;
        string[] sections = new string[iterations];
        Parallel.For(0, iterations, i =>                                              
        {                                                                             
            sections[i] = PiCalculator.Calculate(                                     
                BatchSize, i * BatchSize);                                            
        });                                                                           
        pi = string.Join("", sections);
        Console.WriteLine(pi);
}
4.0

The output for Listing 21.2 is identical to Output 21.1; however, the execution time is significantly faster if you have multiple CPUs (and possibly slower if you do not). The Parallel.For() API is designed to look similar to a standard for loop. The first parameter is the fromInclusive value, the second is the toExclusive value, and the last is the Action<int> to perform as the loop body. When using an expression lambda for the action, the code looks similar to a for loop statement except that now each iteration may execute in parallel. As with the for loop, the call to Parallel.For() will not complete until all iterations are complete. In other words, by the time execution reaches the string.Join() statement, all sections of pi will have been calculated.

Note that the code for combining the various sections of pi no longer occurs inside the iteration (action) in Listing 21.2. As sections of the pi calculation will very likely not complete sequentially, appending a section whenever an iteration completes will likely append them out of order. Even if sequence was not a problem, there is still a potential race condition because the += operator in Listing 21.1 is not atomic. To address both problems, each section of pi is stored into an array, and no two or more iterations will access a single element within the array simultaneously. Only once all sections of pi are calculated does string.Join() combine them. In other words, we postpone concatenating the sections until after the Parallel.For() loop has completed. This avoids any race condition caused by sections not yet calculated or sections concatenating out of order.

The TPL uses the same sorts of thread pooling techniques that it uses for task scheduling to ensure good performance of the parallel loop: It will try to ensure that CPUs are not overscheduled and so on.

4.0

The TPL also provides a similar parallel version of the foreach statement, as shown in Listing 21.3.

Listing 21.3: Parallel Execution of a foreach Loop

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;                                                     

class Program
{
  // ...
  static void EncryptFiles(
      string directoryPath, string searchPattern)
  {
      IEnumerable<string> files = Directory.EnumerateFiles(
          directoryPath, searchPattern,
          SearchOption.AllDirectories);

      Parallel.ForEach(files, fileName =>                                       
      {                                                                         
          Encrypt(fileName);                                                    
      });                                                                       
  }
  // ...
}

In this example, we call a method that encrypts each file within the files collection. It does so in parallel, executing as many threads as the TPL determines is efficient.

4.0

Canceling a Parallel Loop

Unlike a task, which requires an explicit call if it is to block until it completes, a parallel loop executes iterations in parallel but does not itself return until the entire parallel loop completes. Canceling a parallel loop, therefore, generally involves the invocation of the cancellation request from a thread other than the one executing the parallel loop. In Listing 21.5, we invoke Parallel.ForEach<T>() using Task.Run(). In this manner, not only does the query execute in parallel, but it also executes asynchronously, allowing the code to prompt the user to “Press any key to exit.”

4.0

Listing 21.5: Canceling a Parallel Loop

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class Program
{
  // ...

  static void EncryptFiles(
      string directoryPath, string searchPattern)
  {

      string stars =
          "*".PadRight(Console.WindowWidth-1, '*');

      IEnumerable<string> files = Directory.GetFiles(
         directoryPath, searchPattern,
         SearchOption.AllDirectories);

      CancellationTokenSource cts =                                                   
          new CancellationTokenSource();                                              
      ParallelOptions parallelOptions =                                               
          new ParallelOptions                                                         
              { CancellationToken = cts.Token };                                      
      cts.Token.Register(                                                             
          () => Console.WriteLine("Canceling..."));                                   

      Console.WriteLine("Press any key to exit.");

      Task task = Task.Run(() =>
          {
              try
              {
                  Parallel.ForEach(
                      files, parallelOptions,
                      (fileName, loopState) =>
                          {
                              Encrypt(fileName);
                          });
              }
              catch(OperationCanceledException){}
          });

      // Wait for the user's input
      Console.Read();
      // Cancel the query
      cts.Cancel();                                                                  
      Console.Write(stars);
      task.Wait();
  }
}

The parallel loops use the same cancellation token pattern that tasks use. The token obtained from a CancellationTokenSource is associated with the parallel loop by calling an overload of the ForEach() method that has a parameter of type ParallelOptions. This object contains the cancellation token.

Note that if you cancel a parallel loop operation, any iterations that have not started yet are prevented from starting by checking the IsCancellationRequested property. Existing executing iterations will run to their respective termination points. Furthermore, calling Cancel() even after all iterations have completed will still cause the registered cancel event (via cts.Token.Register()) to execute.

The only means by which the ForEach() method is able to acknowledge that the loop has been canceled is via the OperationCanceledException. Given that cancellation in this example is expected, the exception is caught and ignored, allowing the application to display “Canceling…”, followed by a line of stars before exiting.

4.0
4.0

Running LINQ Queries in Parallel

Just as it is possible to execute a loop in parallel using Parallel.For(), so it is also possible to execute LINQ queries in parallel using the Parallel LINQ API (PLINQ, for short). An example of a simple nonparallel LINQ expression is shown in Listing 21.6; in Listing 21.7, we modify it to run in parallel.

Listing 21.6: LINQ Select()

using System.Collections.Generic;
using System.Linq;

class Cryptographer
{
  // ...
  public List<string>
    Encrypt(IEnumerable<string> data)
  {
      return data.Select(
          item => Encrypt(item)).ToList();
  }
  // ...
}

In Listing 21.6, a LINQ query uses the Select() standard query operator to encrypt each string within a sequence of strings and convert the resultant sequence to a list. This seems like an embarrassingly parallel operation; each encryption is likely to be a high-latency processor-bound operation that could be farmed out to a worker thread on another CPU.

Listing 21.7 shows how to modify Listing 21.6 so that the code that encrypts the strings is executed in parallel.

4.0

Listing 21.7: Parallel LINQ Select()

using System.Linq;

class Cryptographer
{
  // ...
  public List<string> Encrypt (IEnumerable<string> data)
  {
      return data.AsParallel().Select(
          item => Encrypt(item)).ToList();
  }
  // ...
}

As Listing 21.7 shows, the change to enable parallel support is extremely small! All that it involves is a standard query operator, AsParallel(), which can be found on the static class System.Linq.ParallelEnumerable. This simple extension method tells the runtime that it can execute the query in parallel. The result is that on machines with multiple available CPUs, the total time taken to execute the query can be significantly shorter.

System.Linq.ParallelEnumerable, the engine that was introduced in Microsoft .NET Framework 4.0 to enable PLINQ, includes a superset of the query operators available on System.Linq.Enumerable. Thus, it provides the API that enables the possible performance improvements for all of the common query operators, including those used for sorting, filtering (Where()), projecting (Select()), joining, grouping, and aggregating. Listing 21.8 shows how to do a parallel sort.

Listing 21.8: Parallel LINQ with Standard Query Operators

// ...
      OrderedParallelQuery<string> parallelGroups =
          data.AsParallel().OrderBy(item => item);

      // Show the total count of items still
      // matches the original count
      System.Diagnostics.Trace.Assert(
          data.Count == parallelGroups.Sum(
              item => item.Count()));
// ...

As Listing 21.8 shows, invoking the parallel version simply involves a call to the AsParallel() extension method. Notice that the type of the result returned by the parallel standard query operators is either ParallelQuery<T> or OrderedParallelQuery<T>; both inform the compiler that it should continue to use the parallel versions of the standard query operations that are available.

Given that query expressions are simply a syntactic sugar for the method call form of the query used in Listings 21.5 and 21.6, you can just as easily use AsParallel() with the expression form. Listing 21.9 shows an example of executing a grouping operation in parallel using query expression syntax.

4.0

Listing 21.9: Parallel LINQ with Query Expressions

// ...
      ParallelQuery<IGrouping<char, string>> parallelGroups;
      parallelGroups =
          from text in data.AsParallel()                                          
            orderby text                                                          
            group text by text[0];                                                

      // Show the total count of items still
      // matches the original count

      System.Diagnostics.Trace.Assert(
          data.Count == parallelGroups.Sum(
              item => item.Count()));
// ...

As you saw in the previous examples, converting a query or iteration loop to execute in parallel is simple. There is one significant caveat, however: As we will discuss in depth in Chapter 22, you must take care not to allow multiple threads to inappropriately access and modify the same memory simultaneously. Doing so will cause a race condition.

As we saw earlier in this chapter, the Parallel.For() and Parallel.ForEach<T> methods will gather up any exceptions thrown during the parallel iterations and then throw one aggregating exception containing all of the original exceptions. PLINQ operations are no different. That is, they also have the potential of returning multiple exceptions for exactly the same reason: When the query logic is run on each element in parallel, the code executing on each element can independently throw an exception. Unsurprisingly, PLINQ deals with this situation in exactly the same way as do parallel loops and the TPL: Exceptions thrown during parallel queries are accessible via the InnerExceptions property of the AggregateException. Therefore, wrapping a PLINQ query in a try/catch block with the exception type of System.AggregateException will successfully handle any exceptions within each iteration that were unhandled.

4.0

Canceling a PLINQ Query

As expected, the cancellation request pattern is also available on PLINQ queries. Listing 21.10 (with Output 21.3) provides an example. Like the parallel loops, canceled PLINQ queries will throw a System.OperationCanceledException. Also like the parallel loops, executing a PLINQ query is a synchronous operation on the invoking thread. Thus, a common technique is to wrap the parallel query in a task that runs on another thread so that the current thread can cancel it if necessary—the same solution used in Listing 21.5.

Listing 21.10: Canceling a PLINQ Query

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public static class Program
{
  public static List<string> ParallelEncrypt(
      List<string> data,
      CancellationToken cancellationToken)
  {
      int govener = 0;
      return data.AsParallel().WithCancellation(
          cancellationToken).Select(
              (item) =>
              {
                  if (Interlocked.CompareExchange(
                      ref govener, 0, 100) % 100 == 0)
                  {
                      Console.Write('.');
                  }
                  Interlocked.Increment(ref govener);
                  return Encrypt(item);
              }).ToList();
  }

  public static async Task Main()
  {

      ConsoleColor originalColor = Console.ForegroundColor;
      List<string> data = Utility.GetData(100000).ToList();

      using CancellationTokenSource cts =
          new CancellationTokenSource();

      Task task = Task.Run(() =>
      {
          data = ParallelEncrypt(data, cts.Token);
      }, cts.Token);

      Console.WriteLine("Press any key to Exit.");
      Task<int> cancelTask = ConsoleReadAsync(cts.Token);

      try
      {
          Task.WaitAny(task, cancelTask);
          // Cancel whichever task has not finished.
          cts.Cancel();
          await task;

          Console.ForegroundColor = ConsoleColor.Green;
          Console.WriteLine("
Completed successfully");

      }
      catch (OperationCanceledException taskCanceledException)
      {
          Console.ForegroundColor = ConsoleColor.Red;
          Console.WriteLine(
              $"
Cancelled: { taskCanceledException.Message }");
      }
      finally
      {
          Console.ForegroundColor = originalColor;
      }
  }

  private static async Task<int> ConsoleReadAsync(
      CancellationToken cancellationToken = default)
  {
      return await Task.Run(async () =>
      {
          const int maxDelay = 1025;
          int delay = 0;
          while (!cancellationToken.IsCancellationRequested)
          {
              if (Console.KeyAvailable)
              {
                  return Console.Read();
              }
              else
              {
                  await Task.Delay(delay,cancellationToken);
                  if (delay < maxDelay) delay *= 2 + 1;
              }
          }
          cancellationToken.ThrowIfCancellationRequested();
          throw new InvalidOperationException(
              "Previous line should throw preventing this from ever
  executing");
      }, cancellationToken);
  }

  private static string Encrypt(string item)
  {

      Cryptographer cryptographer = new Cryptographer();
      return System.Text.Encoding.UTF8.GetString(cryptographer.
Encrypt(item));
  }
}
4.0
4.0

Output 21.3

Press any key to Exit.
..........................................................................
..........................................................................
.............................................................
...................................
Cancelled: The query has been canceled via the token supplied to
WithCancellation.

As with a parallel loop or task, canceling a PLINQ query requires a CancellationToken, which is available from a CancellationTokenSource. However, rather than overloading every PLINQ query to support the cancellation token, the ParallelQuery<T> object returned by IEnumerable’s AsParallel() method includes a WithCancellation() extension method that simply takes a CancellationToken. As a result, calling Cancel() on the CancellationTokenSource object will request the parallel query to cancel—because it checks the IsCancellationRequested property on the CancellationToken.

As mentioned, canceling a PLINQ query will throw an exception in place of returning the complete result. One common technique for dealing with a possibly canceled PLINQ query is to wrap the query in a try block and catch the OperationCanceledException. A second common technique, used in Listing 21.10, is to pass the CancellationToken both to ParallelEncrypt() and as a second parameter on Run(). This will cause task.Wait() to throw an AggregateException whose InnerException property will be set to a TaskCanceledException. The aggregating exception can then be caught, just as you would catch any other exception from a parallel operation.

End 4.0

Summary

In this chapter, we discussed how to use the TPL Parallel class for both for and foreach type iterations. In addition, via the AsParallel() extension method included in System.Linq, we demonstrated how to execute LINQ queries in parallel. The ease with which this is accomplished should help you recognize that parallel iteration is one of the easiest ways to introduce parallel execution. Although it is still necessary to be aware of race conditions and deadlocks, these issues are far less likely to occur if you do not share data within the iterations of a parallel loop than if you work with tasks directly. All that remains is identifying the CPU-intensive code blocks that can benefit from parallel execution.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.58.36.141