As already said, data parallelism with TPL happens only when we deal with a dataset (not the DataSet
class). Within .NET, the easy way of doing this is with the Parallel.For
and Parallel.ForEach
methods from the Parallel
module. The following example shows the basic usage of the Parallel
module:
for (int i = 0; i < 10; i++) { //do something } Parallel.For(0, 10, i => { //do something });
The first thing that catches our eye is the singularity of logic. While in task parallelism we deal with multiple instances of logic; here, there is always only one type of logic. We simply execute it on multiple CPUs.
This example is obviously incompatible with the Set Theory previously exposed, because there is neither a simple collection of objects. In other words, the parallel For
is an iterative structure as the normal For
.
To parallelize some logic regarding just a simple collection, the Parallel
class gives us the ForEach
method:
var invoiceIdList = new[] { 1, 2, 3, 4, 5 }; Parallel.ForEach(invoiceIdList, InvoiceID => { //do something });
This alternative made with the Parallel.ForEach
method outclasses the very simple result achieved by the Parallel.For
method implementation, giving us the chance to work against a collection that is a Set.
Any collection of any size may be enumerated by the Parallel.ForEach
method. Obviously, the best performance improvement is achieved by big collections because the more items there are, the more the TPL engine can split such items across multiple threads.
Parallelism in .NET executes on the TPL framework. This means that threads from ThreadPool
are used to execute parallel jobs. Limitations and configurations of such behavior were exposed in the Task-based Asynchronous Pattern section in Chapter 4, Asynchronous Programming.
An added feature available in parallelism is the throttling configuration within the ParallelOptions
class. Parallel.Invoke/For/ForEach
methods accept an instance of this class, giving the ability to specify a maximum amount of parallel executions. Here's an example:
//our throttling configuration var throttling = new ParallelOptions { MaxDegreeOfParallelism = 2 }; //let's process the data in a parallel way Parallel.ForEach(invoiceIdList, throttling, i => { //do something });
Please bear in mind that TPL uses threads from ThreadPool
to execute a task's code. This is why tuning ThreadPool
is so important when using parallel programming extensively . In addition to what we saw in the Task-based Asynchronous Pattern section in Chapter 4, Asynchronous Programming, here we will try to show you what happens if we try to increase the thread pool size to an extreme, for example:
ThreadPool.SetMinThreads(256, 256); ThreadPool.SetMaxThreads(4096, 4096);
The configuration shown asks the thread pool to increase its size from a minimum of 256 threads to a maximum size of 4096 (losing dynamic size management—the default value for the maximum size). Increasing the thread pool size at such high values will cost some CPU usage and memory because the operating system needs such resources in thread creation.
Obviously, such a high thread availability will give TPL the ability to parallelize hundreds of tasks (at least 256, as set earlier). Carefully increment so extremely global thread availability because of the increased possibility of cross-thread issues that we will need to handle with locks and signals, as seen in the Multithreading Synchronization section in Chapter 3, CLR Internals. In fact, with such an extreme concurrency level, when using locks to serialize specific code blocks, a huge overhead in terms of CPU time will occur because of the contest of the lock flag that all concurrent threads will try to obtain.
Within a parallel iteration, we cannot use the break
keyword in any classic for
/foreach
statement. If we need a similar behavior, we can use an overload of the foreach
method that will execute inner parallel code by using an Action<T,
ParallelLoopState>
class that in addition to the iterating item will also inject a ParallelLoopState
object available to the inner code. This state object will provide information about the overall parallel state or let us request a premature stop of the full parallel execution. Here's a complete example:
static void Main(string[] args) { //a big dataset var invoiceIdList = Enumerable.Range(1, 1000); int c = 0; Parallel.ForEach(invoiceIdList, (id, state) => { //stop all ForEach execution if anything go wrong try { //execute some logic ExecuteSomeCode(id); //within the lambda/method we can know about stop signalling if (state.IsStopped) Console.WriteLine("Executed # {0} when IsStopped was true", c++); else Console.WriteLine("Executed # {0}", c++); } catch (Exception ex) { Console.WriteLine("Error: {0}", ex.Message); //stop al parallel process state.Stop(); Console.WriteLine("Requested a parallel state break!"); } }); Console.WriteLine("END"); Console.ReadLine(); } private static readonly Random random = new Random(); private static void ExecuteSomeCode(int id) { //a random execution time Thread.Sleep(random.Next(1000, 2000)); //an impredicted fail if (DateTime.Now.Millisecond >= 800) throw new Exception("Something gone wrong!"); }
In this example, we used the Stop
method that actually requests a stop to all subsequent parallel interactions and together will signal the running iterations that a stop has been requested by the IsStopped
flag. This is the output of such an example (the results can vary a lot):
Executed # 0 Executed # 1 Executed # 2 Executed # 3 Executed # 5 Executed # 4 Executed # 6 Executed # 7 Executed # 8 Error: Something gone wrong! Requested a parallel state break! Executed # 9 when IsStopped was true Executed # 10 when IsStopped was true Error: Something gone wrong! Requested a parallel state break! Error: Something gone wrong! Requested a parallel state break! Error: Something gone wrong! Requested a parallel state break! Executed # 11 when IsStopped was true Executed # 12 when IsStopped was true Executed # 13 when IsStopped was true Executed # 14 when IsStopped was true END
As shown, after the initial normal execution of parallel statements (from #0
to #8
), an error has occurred; this invoked the Stop
method of the ParallelLoopState
class, which is available in the state parameter within the lambda code. This prevented new interactions of the Parallel.ForEach
method. Within the already executing interactions, the IsStopped
flag is given a value of true
, so (eventually) a proper logic may be applied.
Similar to the Stop
method, the Break
method can also stop the execution of a parallel statement but it will stop executing only the items that will follow the calling item in the underlying collection order.
If we have a collection of integers from 1
to 100
, and when processing the 14th we called the Break
method, only items from 15
to 100
will actually receive the IsStopped
flag or will not run at all.
Any time we deal with data parallelism, TPL will prepare data to flow in different tasks in small groups. Such groups are called partitions.
Two default partition logics are available within the .NET framework. Range partitioning happens against any finite collection. It divides the collection between all available threads, and any partition is then executed within its related thread. The following shows an example of the Parallel.For
method that produces a finite indexer collection of values:
Parallel.For(1, 1000, item => { Console.WriteLine("Item {0} Task {1} Thread {2}", item, Task.CurrentId, Thread.CurrentThread.ManagedThreadId); Thread.Sleep(2000); });
This code produces the following output:
Item 1 Task 1 Thread 8 Item 125 Task 2 Thread 9 Item 249 Task 3 Thread 11 Item 373 Task 4 Thread 10 Item 497 Task 5 Thread 12 Item 621 Task 6 Thread 16 Item 745 Task 7 Thread 14 Item 869 Task 8 Thread 15 Item 993 Task 9 Thread 13
As visible, the index value collection has been divided by 9
, as 8
is the number of the initial ThreadPool
size, plus the one new thread created by ThreadPool
is triggered by the huge pre-empted work. The same range partitioning logic is involved by using the AsParallel()
method against any other finite collection such as an Array
, ArrayList
, List<T>
, and so on.
Another built-in partition logic is the
chunk partitioning logic, which takes place whenever we use the Parallel.ForEach
method or the AsParallel()
method against any enum without a finite length. This partitioning is based on an enumerator logic. It simply asks for some new item (usually the same amount as the number of CPU cores), creates a new task for this item group, and puts the execution on an available thread, and then waits for any new thread's availability to start its logic again. In chunk partitioning, the chunk size is known at start and totally handled by the TPL inner logic.
Chunk partitioning has a better balancing capability than the range partitioning because a chunk is often smaller than a partition.
If built-in partitioning logic is not enough for our needs, we can create a custom partitioner by inheriting from the Partitioner<T>
class. A custom partition logic can avoid using locks, greatly improve overall resource usage, and lead to energetic efficiency within the whole solution. A complete guide is available on the MSDN website: https://msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx.
Although chunk partitioning supports dynamic chunk sizes, this size is invariant during a single enumeration. If we need full dynamic partitioning, we need to create a partitioner. An example is shown on the MSDN website:
https://msdn.microsoft.com/en-us/library/dd997416%28v=vs.110%29.aspx
Further details about partitioning are explained in the Partitioning optimization section later in this chapter.
An interesting behavior takes place when we combine sliding programming, just like when using a cursor from a stream or an enumerable with parallel programming. In this scenario, we add high computation speed together with a very low footprint in the memory because of the tiny memory usage made by the few pieces of data currently loaded in each thread. Here is an example:
static void Main(string[] args) { var enumerable = GetEnumerableData(); Parallel.ForEach(enumerable, new ParallelOptions { MaxDegreeOfParallelism = 2, }, i => { //process the data Console.WriteLine("Processing {0}...", i); Thread.Sleep(2000); }); Console.WriteLine("END"); Console.ReadLine(); } private static IEnumerable<int> GetEnumerableData() { //let's produce an enumerable data source //eventually use an underlying steam for (int i = 0; i < 10; i++) { Console.WriteLine("Yielding {0}...", i); yield return i; } }
This scenario gives tremendous computational power without having to keep in memory all data altogether, thus, only actually processing objects resides in memory.
The previous examples shows a single method using the yield
keyword for manually enumerated values. The example may be improved by implementing multiple methods using the yield
operator invoking each one to the other. The obtained architecture, will be able to handle extremely complex logic without never having to keep more than needed data in memory.
3.145.50.222