Data parallelism with PLINQ

PLINQ is the framework required to use LINQ within the TPL parallel framework. In .NET, it is straightforward to use parallelism against any LINQ query because we simply need to add the AsParallel method at the root of the query to switch the execution from the simple LINQ engine to PLINQ with TPL support.

The following example will execute two different where conditions against an enumerable using PLINQ:

static void Main(string[] args)
{
    //a dataset
    var items = Enumerable.Range(1, 100);

    //multi-level in-memory where executed as data parallelism
    var processedInParallel = items.AsParallel()
        .Where(x => CheckIfAllowed1(x))
        .Where(x => CheckIfAllowed2(x))
        .ToArray();

    Console.ReadLine();
}

private static bool CheckIfAllowed2(int x)
{
    Console.WriteLine("Step 2 -> Checking {0}", x);
    //some running time
    Thread.Sleep(1000);
    return x % 3 == 0;
}

private static bool CheckIfAllowed1(int x)
{
    Console.WriteLine("Step 1 -> Checking {0}", x);
    //some running time
    Thread.Sleep(2000);
    return x % 2 == 0;
}

This is a partial result:

Step 1 -> Checking 1 //first chunk starts
Step 1 -> Checking 6
Step 1 -> Checking 3
Step 1 -> Checking 2
Step 1 -> Checking 4
Step 1 -> Checking 7
Step 1 -> Checking 8
Step 1 -> Checking 5
Step 1 -> Checking 9
Step 1 -> Checking 10
Step 2 -> Checking 4 //second chunk starts
Step 2 -> Checking 8
Step 2 -> Checking 2
Step 2 -> Checking 6
Step 1 -> Checking 12
Step 1 -> Checking 11
Step 1 -> Checking 13

The execution of the preceding example will easily show how PLINQ performed using the chunk partitioning logic. The first chunk of items (10 items) reached Step 1 in a simple way. Just later, the second chunk of items were executed all together. This second chunk contains items of the first chunk that succeeded in passing Step 1 and now are ready for Step 2, and new items for the Step 1.

There is the ability to use the AsParallel method without returning values with the ForAll method, as shown in the following code example:

items.AsParallel().ForAll(i =>
{
    //do something
});

After an AsParallel method invocation, the type of the enumerable changes in ParallelQuery<T>. This new type adds configurability for parallelism, such as forcing parallel-concurrency or forcing parallelism itself, although the heuristics of the TPL cannot be enabled if given an enumerable.

Forcing parallelism (with the WithExecutionMode method) is useful when the engine does not seem to understand that parallelism, it could add some execution time (latency time) reduction. This happens because anytime we use the AsParallel method, the engine makes a prediction of the reduced execution time, and if this is not positive, the engine can decide to not use parallelism at all. Here is an example:

//multi-level in-memory where executed as data parallelism
var processedInParallel = items.AsParallel()
    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    .Where(x => CheckIfAllowed1(x))
    .Where(x => CheckIfAllowed2(x))
    .ToArray();

We can configure parallel concurrency by setting the maximum degree with the WithDegreeOfParallelism method. This method is useful for limiting (throttling) concurrency level, and for increasing it above the usual size as defined by the heuristic of the TPL. The maximum size is 64 for .NET 4 and 512 for .NET 4.5+, while the default value is the CPU core count. Here is an example:

var processedInParallel = items.AsParallel()
    .WithDegreeOfParallelism(100)
    .Where(x => CheckIfAllowed1(x))
    .Where(x => CheckIfAllowed2(x))
    .ToArray();

Another useful method of the ParallelQuery<T> class is WithMergeOptions, which gives us the ability to configure how the parallel engine will buffer (or not) data from the parallel partitions before collecting the result. The ability to disable buffering at all is interesting. This choice will give the parallel results to any enumerator consuming the parallel query as soon as possible, without having to wait for processing all parallel query items. The following shows an example that consists of parallel merge options:

items.AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered)

Partitioning optimization

The CLR gives us the ability to force a specific partitioning logic if the one exposed as the default is not optimal for our needs. The default partitioning logic is automatically chosen according to the data collection type given as an input through the AsParallel method. Here is an example:

//a dataset
var items = Enumerable.Range(1, 1000).ToArray();

//a customized partitioning logic
//range partitioning
var partitioner = Partitioner.Create<int>(items, false);

partitioner.AsParallel().ForAll(item =>
    {
        Console.WriteLine("Item {0} Task {1} Thread {2}", item, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(2000);
    });

The preceding example shows a range partitioning logic within the AsParallel execution of PLINQ. Here, the result shows the partition size:

Item 1 Task 2 Thread 6
Item 751 Task 8 Thread 13
Item 501 Task 6 Thread 16
Item 251 Task 4 Thread 12
Item 376 Task 5 Thread 15
Item 626 Task 7 Thread 14
Item 876 Task 9 Thread 9
Item 126 Task 3 Thread 10
Item 377 Task 5 Thread 15
Item 877 Task 9 Thread 9

Instead, the following example shows a load-balancing logic that is obtainable by using the Partitioner class:

//a dataset
var items = Enumerable.Range(1, 1000).ToArray();

//a customized partitioning logic
//a load-balancing logic
var partitioner = Partitioner.Create<int>(items, true);

partitioner.AsParallel().ForAll(item =>
    {
        Console.WriteLine("Item {0} Task {1} Thread {2}", item, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(2000);
    });

The following is the output:

Item 2 Task 2 Thread 10
Item 7 Task 6 Thread 11
Item 5 Task 5 Thread 15
Item 8 Task 8 Thread 16
Item 6 Task 7 Thread 13
Item 3 Task 4 Thread 12
Item 4 Task 3 Thread 17
Item 1 Task 9 Thread 9
Item 10 Task 7 Thread 13
Item 15 Task 5 Thread 15

When no partitioning logic fits your needs, the only choice available is writing your own partitioner by extending the Partitioner<T> or OrderablePartitioner<T> class:

    class Program
    {
        static void Main(string[] args)
        {
            //a dataset
            var items = Enumerable.Range(1, 1000).ToArray();

            //my partitioner
            var partitioner = new MyChunkPartitioner(items);

            partitioner.AsParallel().ForAll(item =>
                {
                    Console.WriteLine("Item {0} Task {1} Thread {2}", item, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
                    Thread.Sleep(2000);
                });

            Console.ReadLine();
        }
    }

    //only use for testing purposes
    public class MyChunkPartitioner : Partitioner<int>
    {
        //underlying data collection
        public IEnumerable<int> Items { get; private set; }
        public MyChunkPartitioner(IEnumerable<int> items)
        {
            Items = items;
        }

        //partition elaboration
        public override IList<IEnumerator<int>> GetPartitions(int partitionCount)
        {
            var result = new List<IEnumerator<int>>();

            //compute the page size in an easy way
            var pageSize = Items.Count() / partitionCount;

            for (int page = 0; page < partitionCount; page++)
                result.Add(Items.Skip(page * pageSize).Take(pageSize).GetEnumerator());

            return result;
        }
    }

Keep in mind that with custom partitioning logic, we have the opportunity to define partition size, and not partition count, because this count is passed as a parameter from outside.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.139.224