Data parallelism with TPL

As already said, data parallelism with TPL happens only when we deal with a dataset (not the DataSet class). Within .NET, the easy way of doing this is with the Parallel.For and Parallel.ForEach methods from the Parallel module. The following example shows the basic usage of the Parallel module:

for (int i = 0; i < 10; i++)
{
    //do something
}

Parallel.For(0, 10, i =>
{
    //do something
});

The first thing that catches our eye is the singularity of logic. While in task parallelism we deal with multiple instances of logic; here, there is always only one type of logic. We simply execute it on multiple CPUs.

This example is obviously incompatible with the Set Theory previously exposed, because there is neither a simple collection of objects. In other words, the parallel For is an iterative structure as the normal For.

To parallelize some logic regarding just a simple collection, the Parallel class gives us the ForEach method:

var invoiceIdList = new[] { 1, 2, 3, 4, 5 };

Parallel.ForEach(invoiceIdList, InvoiceID =>
    {
        //do something
    });

This alternative made with the Parallel.ForEach method outclasses the very simple result achieved by the Parallel.For method implementation, giving us the chance to work against a collection that is a Set.

Tip

Although a collection in .NET is not actually a Set, it is quite similar. The only missing requirement is that a collection does not guarantee the uniqueness of items.

Any collection of any size may be enumerated by the Parallel.ForEach method. Obviously, the best performance improvement is achieved by big collections because the more items there are, the more the TPL engine can split such items across multiple threads.

Parallelism in .NET executes on the TPL framework. This means that threads from ThreadPool are used to execute parallel jobs. Limitations and configurations of such behavior were exposed in the Task-based Asynchronous Pattern section in Chapter 4, Asynchronous Programming.

An added feature available in parallelism is the throttling configuration within the ParallelOptions class. Parallel.Invoke/For/ForEach methods accept an instance of this class, giving the ability to specify a maximum amount of parallel executions. Here's an example:

//our throttling configuration
var throttling = new ParallelOptions
{
    MaxDegreeOfParallelism = 2
};

//let's process the data in a parallel way
Parallel.ForEach(invoiceIdList, throttling, i =>
{
    //do something
});

ThreadPool tuning

Please bear in mind that TPL uses threads from ThreadPool to execute a task's code. This is why tuning ThreadPool is so important when using parallel programming extensively . In addition to what we saw in the Task-based Asynchronous Pattern section in Chapter 4, Asynchronous Programming, here we will try to show you what happens if we try to increase the thread pool size to an extreme, for example:

ThreadPool.SetMinThreads(256, 256);
ThreadPool.SetMaxThreads(4096, 4096);

The configuration shown asks the thread pool to increase its size from a minimum of 256 threads to a maximum size of 4096 (losing dynamic size management—the default value for the maximum size). Increasing the thread pool size at such high values will cost some CPU usage and memory because the operating system needs such resources in thread creation.

Obviously, such a high thread availability will give TPL the ability to parallelize hundreds of tasks (at least 256, as set earlier). Carefully increment so extremely global thread availability because of the increased possibility of cross-thread issues that we will need to handle with locks and signals, as seen in the Multithreading Synchronization section in Chapter 3, CLR Internals. In fact, with such an extreme concurrency level, when using locks to serialize specific code blocks, a huge overhead in terms of CPU time will occur because of the contest of the lock flag that all concurrent threads will try to obtain.

Parallel execution abortion

Within a parallel iteration, we cannot use the break keyword in any classic for/foreach statement. If we need a similar behavior, we can use an overload of the foreach method that will execute inner parallel code by using an Action<T, ParallelLoopState> class that in addition to the iterating item will also inject a ParallelLoopState object available to the inner code. This state object will provide information about the overall parallel state or let us request a premature stop of the full parallel execution. Here's a complete example:

static void Main(string[] args)
{
    //a big dataset
    var invoiceIdList = Enumerable.Range(1, 1000);

    int c = 0;

    Parallel.ForEach(invoiceIdList, (id, state) =>
        {
            //stop all ForEach execution if anything go wrong
            try
            {
                //execute some logic
                ExecuteSomeCode(id);

                //within the lambda/method we can know about stop signalling
                if (state.IsStopped)
                    Console.WriteLine("Executed # {0} when IsStopped was true", c++);
                else
                    Console.WriteLine("Executed # {0}", c++);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error: {0}", ex.Message);
                //stop al parallel process
                state.Stop();
                Console.WriteLine("Requested a parallel state break!");
            }
        });

    Console.WriteLine("END");
    Console.ReadLine();
}

private static readonly Random random = new Random();
private static void ExecuteSomeCode(int id)
{
    //a random execution time
    Thread.Sleep(random.Next(1000, 2000));

    //an impredicted fail
    if (DateTime.Now.Millisecond >= 800)
        throw new Exception("Something gone wrong!");
}

In this example, we used the Stop method that actually requests a stop to all subsequent parallel interactions and together will signal the running iterations that a stop has been requested by the IsStopped flag. This is the output of such an example (the results can vary a lot):

Executed # 0
Executed # 1
Executed # 2
Executed # 3
Executed # 5
Executed # 4
Executed # 6
Executed # 7
Executed # 8
Error: Something gone wrong!
Requested a parallel state break!
Executed # 9 when IsStopped was true
Executed # 10 when IsStopped was true
Error: Something gone wrong!
Requested a parallel state break!
Error: Something gone wrong!
Requested a parallel state break!
Error: Something gone wrong!
Requested a parallel state break!
Executed # 11 when IsStopped was true
Executed # 12 when IsStopped was true
Executed # 13 when IsStopped was true
Executed # 14 when IsStopped was true
END

As shown, after the initial normal execution of parallel statements (from #0 to #8), an error has occurred; this invoked the Stop method of the ParallelLoopState class, which is available in the state parameter within the lambda code. This prevented new interactions of the Parallel.ForEach method. Within the already executing interactions, the IsStopped flag is given a value of true, so (eventually) a proper logic may be applied.

Similar to the Stop method, the Break method can also stop the execution of a parallel statement but it will stop executing only the items that will follow the calling item in the underlying collection order.

If we have a collection of integers from 1 to 100, and when processing the 14th we called the Break method, only items from 15 to 100 will actually receive the IsStopped flag or will not run at all.

Partitions

Any time we deal with data parallelism, TPL will prepare data to flow in different tasks in small groups. Such groups are called partitions.

Two default partition logics are available within the .NET framework. Range partitioning happens against any finite collection. It divides the collection between all available threads, and any partition is then executed within its related thread. The following shows an example of the Parallel.For method that produces a finite indexer collection of values:

Parallel.For(1, 1000, item =>
    {
        Console.WriteLine("Item {0} Task {1} Thread {2}", item, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(2000);
    });

This code produces the following output:

Item 1 Task 1 Thread 8
Item 125 Task 2 Thread 9
Item 249 Task 3 Thread 11
Item 373 Task 4 Thread 10
Item 497 Task 5 Thread 12
Item 621 Task 6 Thread 16
Item 745 Task 7 Thread 14
Item 869 Task 8 Thread 15
Item 993 Task 9 Thread 13

As visible, the index value collection has been divided by 9, as 8 is the number of the initial ThreadPool size, plus the one new thread created by ThreadPool is triggered by the huge pre-empted work. The same range partitioning logic is involved by using the AsParallel() method against any other finite collection such as an Array, ArrayList, List<T>, and so on.

Another built-in partition logic is the chunk partitioning logic, which takes place whenever we use the Parallel.ForEach method or the AsParallel() method against any enum without a finite length. This partitioning is based on an enumerator logic. It simply asks for some new item (usually the same amount as the number of CPU cores), creates a new task for this item group, and puts the execution on an available thread, and then waits for any new thread's availability to start its logic again. In chunk partitioning, the chunk size is known at start and totally handled by the TPL inner logic.

Chunk partitioning has a better balancing capability than the range partitioning because a chunk is often smaller than a partition.

If built-in partitioning logic is not enough for our needs, we can create a custom partitioner by inheriting from the Partitioner<T> class. A custom partition logic can avoid using locks, greatly improve overall resource usage, and lead to energetic efficiency within the whole solution. A complete guide is available on the MSDN website: https://msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx.

Although chunk partitioning supports dynamic chunk sizes, this size is invariant during a single enumeration. If we need full dynamic partitioning, we need to create a partitioner. An example is shown on the MSDN website:

https://msdn.microsoft.com/en-us/library/dd997416%28v=vs.110%29.aspx

Further details about partitioning are explained in the Partitioning optimization section later in this chapter.

Sliding parallel programming

An interesting behavior takes place when we combine sliding programming, just like when using a cursor from a stream or an enumerable with parallel programming. In this scenario, we add high computation speed together with a very low footprint in the memory because of the tiny memory usage made by the few pieces of data currently loaded in each thread. Here is an example:

static void Main(string[] args)
{
    var enumerable = GetEnumerableData();

    Parallel.ForEach(enumerable, new ParallelOptions
    {
        MaxDegreeOfParallelism = 2,
    }, i =>
        {
            //process the data
            Console.WriteLine("Processing {0}...", i);
            Thread.Sleep(2000);
        });

    Console.WriteLine("END");
    Console.ReadLine();
}

private static IEnumerable<int> GetEnumerableData()
{
    //let's produce an enumerable data source
    //eventually use an underlying steam
    for (int i = 0; i < 10; i++)
    {
        Console.WriteLine("Yielding {0}...", i);
        yield return i;
    }
}

This scenario gives tremendous computational power without having to keep in memory all data altogether, thus, only actually processing objects resides in memory.

The previous examples shows a single method using the yield keyword for manually enumerated values. The example may be improved by implementing multiple methods using the yield operator invoking each one to the other. The obtained architecture, will be able to handle extremely complex logic without never having to keep more than needed data in memory.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.50.222