Working with actors and the TPL Dataflow library

With Visual Studio 2010 and .NET 4.0, we were given the Task Parallel Library (TPL), which allowed us to process a known set of data or operations over multiple threads using constructs such as the Parallel.For loop.

Coinciding with the release of Visual Studio 2012, Microsoft provided the ability to take data that you may have and process it in chunks through a series of steps, where each step can be processed independently of the others. This library is called the TPL Dataflow Library.

An interesting thing to note about this library is that it was originally included as part of .NET Framework in the pre-release versions, but the team moved it to a NuGet distribution model so that changes and updates to the package could be made outside of the normal .NET life cycle. A similar approach has been taken with the Managed Extensibility Framework (MEF) for web and Windows 8.x apps. This change to the distribution model shows a willingness from Microsoft to change their practices so that they can be more responsive to developer needs.

From a terminology perspective, the processing steps are called actors, because they "act" on the data they are presented with, and the series of steps performed are typically referred to as a pipeline.

A fairly common example of this is in image processing where a set of images needs to be converted in some way, such as adding sepia tones, ensuring all images are in portrait mode, or doing facial recognition. Another scenario might be taking streaming data, such as sensor feeds, and processing that to determine the actions to take.

This recipe will show you how the library works. To do this, we will take some keyboard input, and display it back on the screen after having converted it to upper case and Base64 encoding it. If you would like to explore further after completing this recipe, you will find some references to more information listed later in this recipe.

In order to do this, we will use an ActionBlock object and a TransformBlock object. An ActionBlock object is a target block that calls a user-provided delegate when it receives data, while a TransformBlock object can be both a source and a target. In this recipe, you will use a TransformBlock object to convert characters to upper case, and encode them before passing them to an ActionBlock object to display them on screen.

Getting ready

Simply start a Premium of edition VS2015 or use Visual Studio Community 2015, and you're ready to go.

How to do it...

Create a Dataflow-powered application using the following steps:

  1. Create a new application targeting .NET Framework 4.5 by navigating to Visual C# | Console Application and name it DataFlow.
  2. Using NuGet, add the TPL Dataflow Library to the project. The package name to use for installation when using the Package Manager Console is Microsoft.Tpl.Dataflow; otherwise, search for TPL on the Manage NuGet Packages for Solution dialog. (Refer to the Managing packages with NuGet recipe in Chapter 3, Web Development, if you need a refresher on how to do this.)
  3. Open Program.cs, and at the top of the file, add the following using statements:
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
  4. In the Main() method of Program.cs, add the following code to define ActionBlock. The method in the ActionBlock object displays a string on the console and has a Sleep method call in it to simulate long-running work. This gives you a way to slow down processing, and force data to be queued between steps in the pipeline:
    var slowDisplay = new ActionBlock<string>(async s =>
      {
        await Task.Run(() => Thread.Sleep(1000));
        Console.WriteLine(s);
      }
      new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );
  5. In the Main() method again, continue by adding the code for TransformBlock. The TransformBlock object will take a char as input, and return an upper case Base64-encoded string. The TransformBlock object is also linked to the ActionBlock object to create a two-step pipeline.
    var transformer = new TransformBlock<char, string>(c =>
    {
      var upper = c.ToString().ToUpperInvariant();
      var bytes = ASCIIEncoding.ASCII.GetBytes(upper);
      var output = Convert.ToBase64String(bytes);
      return output;
    });
    transformer.LinkTo(slowDisplay);
  6. Now add code to take input from the console, and pass it to the first step of the pipeline (the TransformBlock object in this case). You also need to close and flush the pipeline when you hit Enter so that you can exit the program:
    while (true)
    {
      var key = Console.ReadKey();
      if (key.Key == ConsoleKey.Enter)
      {
        transformer.Complete();
        Console.WriteLine("waiting for the queue to flush");
        transformer.Completion.Wait();
        slowDisplay.Complete();
        slowDisplay.Completion.Wait();
        Console.WriteLine("press any key");
        Console.ReadKey();
        break;
      }
      transformer.Post(key.KeyChar);
    }
  7. Run the program. When the console window appears, just randomly press characters, and hit Enter when you are done. You should see an output similar to the following screenshot. How the encoded strings appear (typically batches of 1-4) will depend on the number of CPU cores in your machine:
    How to do it...

How it works...

Let's look at what just happened. First, you defined two actors, the first being the ActionBlock object that takes a string and displays it on screen, and the second, the TransformBlock object, which takes a character as input and returns an encoded string as output. You then linked the TransformBlock object to the ActionBlock object to create the pipeline for the data to flow through.

Next, you took data that was streaming to you (the console key presses), and passed each key press to the pipeline as soon as it arrived. This continued until the user hit Enter, at which point the Complete() method is used to tell the actors that they should expect no more data. Once the queues have been flushed, the user is prompted to hit a key to close the program. (If you don't flush the queues, you will lose the data that is still in them when the program completes—never a good thing.)

Tip

You can watch the queue flushing process by entering a bunch of characters, and then immediately pressing Enter. Depending on the speed of your machine, you will see the waiting for the queue to flush message scroll past followed by the remaining characters.

Now when you ran the program, the TransformBlock object did its work very quickly and passed its output to the ActionBlock. The interesting thing to note is that even though the data was queuing up to be processed by the ActionBlock object, the amount of code you had to write to do that was zero. The TPL Dataflow Library takes care of all the difficult plumbing code, thread management, and the communication of data between actors as well as determining how many actors it can run at once.

There's more…

You may also be wondering what happens in less straightforward scenarios, such as when you want to conditionally pass data or messages to the next actor. Fortunately, the TPL Dataflow Library is quite powerful, and this recipe is just an introduction to what it offers. For example, the LinkTo() method has a predicate parameter that you can use to filter the messages and decide which actors should do what.

You could also batch data for processing in the later steps by adding data to a buffer using the BufferBlock object and only passing buffered data to subsequent pipeline steps when the buffer is full. There are lots of possibilities, so feel free to go and explore what the library has to offer.

The eagle-eyed among you may also have noticed that the lambda function used by the ActionBlock object featured the async keyword. This was done so that the action block doesn't itself block execution of the program when performing the long-running task and prevent any more input from being processed.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.31.209