CHAPTER 12

image

Task Scheduling

You saw in Chapter 6 how, when creating a continuation, you can pass a scheduler on which to execute the task. The example in the chapter used the out-of-the-box SynchronizationContextTaskScheduler to push task execution on to the UI thread. It turns out, however, that there is nothing special about the SynchronizationContextTaskScheduler; the task scheduler is a pluggable component. .NET 4.5 introduced another specialized scheduler, but beyond that you can write task schedulers yourself. This chapter looks at the new sch eduler introduced in .NET 4.5 and how to write a custom task scheduler. Writing custom task schedulers can be fairly straightforward, but there are some of the issues that you need to be aware of.

ConcurrentExclusiveSchedulerPair

.NET 4.5 introduced a new scheduler—a pair of schedulers, in fact, that work together to simplify concurrency over shared mutable state. This pair of schedulers is called the ConcurrentExclusiveSchedulerPair. The idea is that one scheduler in the pair allows multiple read-based tasks to run concurrently while the other ensures that only a single read/write-based task can run at any one time. If you recall Chapter 4, you will see that the semantics are the same as a reader/writer lock. Consider the code in Listing 12-1, which models a small business. This implementation is not thread safe, as one thread can calculate NetWorth while another is processing a payment. If the payment thread has updated the income field but has not yet decremented the receivables, then an incorrect NetWorth is produced.

Listing 12-1.  Non-Thread-Safe SmallBusiness Class

public class SmallBusiness
{
  private decimal income;
  private decimal receivables;
 
  public virtual decimal NetWorth
  {
   get { return income + receivables; }
  }
  public virtual void RaisedInvoiceFor(decimal amount)
   {
      receivables += amount;
   }
 
   public virtual void ReceivePayments(decimal payment)
   {
     income += payment;
     receivables -= payment;
   }
 }

This thread safety problem can be fixed using a monitor or a reader/writer lock. Assuming NetWorth is called very often, it is more efficient to use a reader/writer lock. Listing 12-2 shows a class, derived from SmallBusiness, that implements the necessary thread safety measures using ReaderWriterLockSlim. The SmallBusinessRWLock class is now able to have multiple threads reading its NetWorth, but only one thread has access when an update is taking place by calling either RaisedInvoiceFor or ReceivePayments.

Listing 12-2.  Reader/Writer Lock, Thread-Safe SmallBusinessRWLock Class

public class SmallBusinessRWLock : SmallBusiness
{
   private ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
 
   public override decimal NetWorth
   {
     get
     {
       rwLock.EnterReadLock();
       try{return base.NetWorth;}
       finally { rwLock.ExitReadLock(); }
     }
    }
    public override void RaisedInvoiceFor(decimal amount)
    {
      rwLock.EnterWriteLock();
      try { base.RaisedInvoiceFor(amount);}
      finally{ rwLock.ExitWriteLock();}
    }
    public override void ReceivePayments(decimal payment)
    {
      rwLock.EnterWriteLock();
      try { base.ReceivePayments(payment);}
      finally{rwLock.ExitWriteLock();}
    }
 }

However, rather than blocking, another way to achieve thread safety would be to schedule the update or read operation to occur in the future when it is safe to do so—in other words, asynchronously. A scheduler could safely allow many read-style operations to occur concurrently, and when a write operation is requested, it could ensure it is only executed when it is the only operation running. The code shown in Listing 12-3 provides such an implementation. Each instance of the SmallBusinessAsync class has its own scheduler, an instance of ConcurrentExclusiveSchedulerPair. This class, as the name suggests, provides two schedulers: one for exclusive accesses, the other for concurrent. When a request is made against the SmallBusinessAsync class, the operation is executed asynchronously using the appropriate scheduler. If the operation is only reading state it will use the concurrent scheduler, and if it is updating state it will use the exclusive scheduler. The concurrent scheduler will run its tasks concurrently, but only if the exclusive scheduler is not running a task. The exclusive scheduler ensures that it only runs one task at a time and that the concurrent scheduler is not running tasks.

Listing 12-3.  Thread-Safe Asynchronous SmallBusinessAsync Class

public class SmallBusinessAsync
{
  private decimal income;
  private decimal receivables;
        
  private ConcurrentExclusiveSchedulerPair rwScheduler =
                        new ConcurrentExclusiveSchedulerPair();
  public Task<decimal> NetWorthAsync
  {
     get
      {
         return Task.Factory.StartNew<decimal>( () => income + receivables,
               CancellationToken.None,
               TaskCreationOptions.None,
                rwScheduler.ConcurrentScheduler);
            }
       }
 
       public Task RaisedInvoiceForAsync(decimal amount)
       {
            return Task.Factory.StartNew(() => receivables += amount,
                CancellationToken.None,
                TaskCreationOptions.None,
                rwScheduler.ExclusiveScheduler);
        }
 
        public Task ReceivePaymentsAsync(decimal payment)
        {
            return Task.Factory.StartNew(() =>
            {
                income += payment;
                receivables -= payment;
            }, CancellationToken.None,
              TaskCreationOptions.None,
              rwScheduler.ExclusiveScheduler);
        }
    }

Using the ConcurrentExclusiveSchedulerPair scheduler gives you an alternative mechanism to blocking synchronization primitives, which for asynchronous operations provides a more natural and efficient thread safety model.

Why Write a Task Scheduler?

As you have seen, the .NET framework ships with three task schedulers: ThreadPoolTaskScheduler (the default), SynchronizationContextTaskScheduler, and ConcurrentExclusiveSchedulerPair. Therefore, the first question you need to answer is, “Why would I want to write a task scheduler?” After all, as you have seen over the last few chapters, the ones that come out of the box seem to be very flexible. The task scheduler provides an execution context for the task, allowing you to take control over which thread or threads are used for task execution. Some reasons to build a scheduler are

  • You have a set of components with high thread affinity that need to be run asynchronously but all on the same thread.
  • You have a set of legacy components written in VB6 or MFC and need to execute these asynchronously. To be able to execute concurrently, these components need to be executed on threads with an ApartmentState of STA, which is not the case with normal thread pool threads.
  • You have a set of work that should always be scheduled on low-priority threads as it shouldn’t interrupt the normal processing on the machine.

The second of these is the scenario we will use to show you an example of a custom task scheduler.

The TaskScheduler Abstraction

The out-of-the-box schedulers both inherit from the TaskScheduler abstract class. If you want to be able to use our own task scheduler for Task execution, then you also need to create a class that derives, directly or indirectly, from TaskScheduler.

The minimum requirement to implement a scheduler is twofold:

  1. Provide an implementation of the three abstract methods of TaskScheduler: QueueTask, GetScheduledTasks, and TryExecuteTaskInline.
  2. Have some mechanism of actually executing the tasks.

Let’s look at what these requirements involve.

Implementing QueueTask

QueueTask is one of the two core pieces of functionality in implementing a task scheduler (the other being actually executing the tasks, as you shall see hereafter). This is its signature:

protected abstract void QueueTask(Task task);

As its name suggests, it is called when a new task has Start called on it explicitly or implicitly by Task.Factory.StartNew or Task.Run. The scheduler must make sure that it does not lose the fact that it must execute the task; therefore, unless it is going to execute it directly, it should store task in a collection. Notice that this member is protected, so you cannot call it directly in unit tests, instead, you will have to exercise the task infrastructure to cause it to be executed.

Implementing GetScheduledTasks

GetScheduledTasks is not called during normal task scheduling. It is there for debugger support so the debugger can show all of the scheduled tasks. GetScheduledTasks signature is as follows:

protected abstract IEnumerable<Task> GetScheduledTasks();

Assuming you have been storing the tasks in a collection, you can simply return the items in the collection. Again, because the method is protected you cannot unit test it directly.

Implementing TryExecuteTaskInline

When the task infrastructure wants to try to execute a task directly, it calls TryExecuteTaskInline. Here is the signature for the method:

protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);

When you implement TryExecuteTaskInline you need to assess whether it is appropriate to execute the task in the context of the calling thread and, if so, you need to run the task. As you shall see, the task may or may not have already been queued via QueueTask, and TryExecuteTaskInline is informed of this via the taskWasPreviouslyQueued parameter. The return value for the method is whether or not the task was executed.

TryExecuteTaskInline is called in two situations:

  1. When RunSynchronously is called on a task, the calling code is saying that it wants to execute the task on its thread. TryExecuteTaskInline is called on the scheduler to attempt this execution. In this case Start is never called, so the task will not have been queued.
  2. Wait is called on a task with no timeout or cancellation token. If the task has not yet been scheduled, then TryExecuteTaskInline is called in an attempt to allow Wait to return as quickly as possible. In this case Start will have been called, so the task will already have been queued.

Executing Tasks

The core responsibility of a task scheduler is to execute tasks. Fortunately, the TaskScheduler base class already wraps up the mechanics and you, as the scheduler implementer, just need to decide how and when to trigger that execution. To execute a task from a scheduler you call the base class method TryExecuteTask, which has the following signature:

bool TryExecuteTask(Task task);

Pass the task you want to execute to TryExecuteTask, and it returns a Boolean indicating whether or not the task was executed. TryExecuteTask will also return true if the task has already been cancelled.

Implementing a Custom Scheduler

As stated earlier, we’ll examine building an STA thread scheduler to illustrate the requirements and some of the complexities of implementing a custom scheduler.

COM AND THREADING

Why might COM interop work require a custom scheduler? This comes down to how COM components interact with threads. COM threading was always one of the more complex aspects of COM, which results from COM being a binary standard whose components can be built from a variety of technologies. Some of these technologies have a high level of thread affinity (particularly VB6 and MFC). You must therefore make sure that these components, known as apartment threaded components, are always executed on a specific thread. This drives the concept of apartments, and in threading terms there are two kinds: Single Threaded Apartments (STAs) and the Multithreaded Apartment (MTA). A thread elects either to enter the MTA or to have its own STA. If a component with high thread affinity is created from an STA thread, then in lives in that STA. However, if it is created from an MTA thread, it lives in a special STA, known as the host STA.

By default .NET threads are MTA threads, so if you create an apartment-threaded component from one it will always run the host STA—in other words, all the COM components will execute on a single thread, removing any potential concurrency you may be trying to achieve. To achieve concurrency with apartment-thread COM components, you need to run a pool of STA threads on which to execute them.

Creating a Basic Implementation

To begin with, you need to create a class that derives from TaskScheduler and implements the abstract members of the base class. This can be seen in Listing 12-4.

Listing 12-4.  Deriving from TaskScheduler

public class STATaskScheduler : TaskScheduler
{
    protected override IEnumerable<Task> GetScheduledTasks()
    {
        throw new NotImplementedException();
    }
  
    protected override void QueueTask(Task task)
    {
        throw new NotImplementedException();
    }
  
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        throw new NotImplementedException();
    }
}

Next you will need somewhere to store the tasks that are enqueued. As this is an inherently concurrent component, use a ConcurrentQueue; also, a BlockingCollection will give you a fairly simple programming model. Remember that a BlockingCollection defaults to wrapping a ConcurrentQueue unless you give it a different data structure. Having somewhere to store tasks will allow you to implement QueueTask and GetScheduledTasks. You also need to be able to call CompleteAdding on the BlockingCollection; to give yourself a place to do that, implement IDisposable. You can see this code in Listing 12-5.

Listing 12-5.  Adding Storage for Enqueued Tasks

public class STATaskScheduler : TaskScheduler, IDisposable
{
    private readonly BlockingCollection<Task> tasks = new BlockingCollection<Task>();
  
    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return tasks.ToArray();
    }
  
    protected override void QueueTask(Task task)
    {
        tasks.Add(task);
    }
  
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        throw new NotImplementedException();
    }
  
    public void Dispose()
    {
        tasks.CompleteAdding();
    }
}

image Note  Calling Dispose on the BlockingCollection calls Dispose on its internal Semaphores. If threads are already running this might end up causing an ObjectDisposedException. Therefore, the safest way to clean up in this scenario is simply to ensure that any waiting tasks know that no more items are to be added so they can exit cleanly.

You can now store the tasks, but you have yet to implement a way to run them—this is next on our agenda. You are going to need a set of STA threads whose job it is to dequeue the tasks and execute them. As your scheduler will be in full control of when this group of threads is manipulated, you can use a simple List<Thread> to store them. The STA threads will need to call TryExecuteTask to run the tasks. How many threads should you use? You will pass this value in to the scheduler’s constructor. Listing 12-6 shows the code for creating the threads and executing the tasks.

Listing 12-6.  Creating the STA Threads

private readonly List<Thread> threads;
  
public STATaskScheduler(int numberOfThreads)
{
    threads = new List<Thread>(numberOfThreads);
  
    for (int i = 0; i < numberOfThreads; i++)
    {
        var thread = new Thread(() =>
             {
                 foreach (Task task in tasks.GetConsumingEnumerable())
                 {
                     TryExecuteTask(task);
                 }
             });
 
        thread.SetApartmentState(ApartmentState.STA);
  
        threads.Add(thread);
        thread.Start();
    }
}

Last, you need to implement TryExecuteTaskInline. As this is an STA thread scheduler, you should only execute the task if the scheduler is being called on an STA thread. This implementation is shown in Listing 12-7.

Listing 12-7.  Implementing TryExecuteTaskInline

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
    if (Thread.CurrentThread.GetApartmentState() == ApartmentState.STA)
    {
        return TryExecuteTask(task);
    }
  
    return false;
}

You now have a fairly basic STA task scheduler. In fact, the PFx team built something similar in the Parallel Extensions Extras samples. However, there are a number of issues with this implementation regarding the management of the threads. If you ever need a large number of threads, there will be a big overhead in allocating them all up front when you may not end up using them. Also, even if you have used a large number of threads, if the amount of work the scheduler needs to do shrinks, then you really should remove the idle threads.

Adding Threads on Demand

If you are going to add threads as they are needed, then you cannot spin them up in the constructor; something about QueueTask must trigger thread creation. However, the maximum number of threads must still be capped so you don’t consume too many resources. You therefore need to base the decision whether to create a new thread on whether you already have an available thread to process the task, and whether the maximum number of threads has already been hit. You can see the changes in Listing 12-8.

Listing 12-8.  Creating Threads on Demand

private readonly int maxThreads;
private int threadsInUse;
  
public STATaskScheduler(int numberOfThreads)
{
    threads = new List<Thread>(numberOfThreads);
    maxThreads = numberOfThreads;
}
  
private void StartNewPoolThread()
{
    var thread = new Thread(() =>
        {
            foreach (Task task in tasks.GetConsumingEnumerable())
            {
                Interlocked.Increment(ref threadsInUse);
                TryExecuteTask(task);
                Interlocked.Decrement(ref threadsInUse);
            }
        });
  
    thread.SetApartmentState(ApartmentState.STA);
  
    threads.Add(thread);
    thread.Start();
}
  
protected override void QueueTask(Task task)
{
    tasks.Add(task);
  
    int threadCount = threads.Count;
    if (threadCount == threadsInUse&& threadCount < maxThreads)
    {
        StartNewPoolThread();
    }
}

However, there is now a nasty race condition in the code. When you spawn the new thread, it does not yet appear to be in use. If another task is queued before the first thread starts processing its task, then QueueTask will think there is a free thread and so will not spawn a new thread. If there is some dependency between the two tasks, then this could prevent the second task from ever executing. To fix this you need to increment the threadsInUse count before you spawn the thread and then, in the thread-processing loop, increment the threadsInUse count only if the thread has executed its first task. You can see the amended code in Listing 12-9.

Listing 12-9.  Fixing the Race Condition

private void StartNewPoolThread()
{
    var thread = new Thread(() =>
        {
            bool firstTaskExecuted = false;
  
            foreach (Task task in tasks.GetConsumingEnumerable())
            {
                if (firstTaskExecuted)
                {
                    Interlocked.Increment(ref threadsInUse);
                }
                else
                {
                    firstTaskExecuted = true;
                }
                TryExecuteTask(task);
                Interlocked.Decrement(ref threadsInUse);
            }
        });
  
    thread.SetApartmentState(ApartmentState.STA);
  
    threads.Add(thread);
    thread.Start();
}
  
protected override void QueueTask(Task task)
{
    tasks.Add(task);
  
    int threadCount = threads.Count;
    if (threadCount == threadsInUse && threadCount < maxThreads)
    {
        Interlocked.Increment(ref threadsInUse);
        StartNewPoolThread();
    }
}

You may have spotted that there is still a potential issue where the thread has been marked as in use, but another thread becomes free and runs the newly queued task. In that case it won’t actually end up doing anything and will instead block on the empty queue. However, this issue will resolve itself when the next task is enqueued as the blocked thread can pick up the new task.

Removing Idle Threads

If you are to remove idle threads, there are a number of factors you need to consider:

  1. What is meant by idle?
  2. What should trigger you to look for idle threads?
  3. How do you know if a thread has been idle?
  4. How do you stop an idle thread?
  5. How do you implement this in a way that you can test the functionality?

An idle thread is one that hasn’t performed any work for a period of time. You should periodically look for these. It makes sense, therefore, that the first two items should be time based. However, this presents an issue with Item 5 as unit tests would have to rely on timeouts. To solve this, you will model the idle processing via an abstraction, as you can see in Listing 12-10. The CheckIdle event signals the scheduler to look for idle threads, and the IdleTimeout specifies how long a thread should be idle before it should be removed from the pool.

Listing 12-10.  Idle Detection Abstraction

public interface IIdleTrigger : IDisposable
{
    event EventHandler CheckIdle;
    TimeSpan IdleTimeout { get; }
}

Remember, you are using this abstraction to be able to provide a test version of the idle trigger, but you will of course require a timer-based one for normal execution. You can see this in Listing 12-11.

Listing 12-11.  Timer-Based Idle Trigger

class TimerIdleTrigger : IIdleTrigger
{
    private readonly TimeSpan idleTimeout;
    private readonly Timer idleTimer;
  
    public TimerIdleTrigger(TimeSpan checkFrequency, TimeSpan idleTimeout)
    {
        this.idleTimeout = idleTimeout;
        idleTimer= new Timer(_ => CheckIdle(this, EventArgs.Empty),
                             null,
                             TimeSpan.Zero,
                             checkFrequency);
    }
  
    public event EventHandler CheckIdle = delegate { };
  
    public TimeSpan IdleTimeout
    {
        get { return idleTimeout; }
    }
  
    public void Dispose()
    {
        idleTimer.Dispose();
    }
}

The next task is to inject an implementation of the idle trigger into the constructor of the scheduler to be able to unit test effectively. However, normally you will want the timer-based idle trigger, so you’ll have two constructors: one that takes an idle trigger and one that defaults to the timer one. During construction you will subscribe to the CheckIdle event of the idle trigger—although we will look at the implementation of the event handler after we have discussed how to recognize a thread is idle. Listing 12-12 shows the code necessary to inject the idle trigger.

Listing 12-12.  Injecting the Idle Trigger

private const int IDLE_CHECK_FREQUENCY = 5;
private const int IDLE_TIMEOUT = 30;
  
private readonly int maxThreads;
private readonly IIdleTrigger idleTrigger;
  
public STATaskScheduler(int maxThreads)
    : this(maxThreads,
           new TimerIdleTrigger(TimeSpan.FromSeconds(IDLE_CHECK_FREQUENCY),
                                TimeSpan.FromSeconds(IDLE_TIMEOUT)))
{
}
  
public STATaskScheduler(int maxThreads, IIdleTrigger idleTrigger)
{
    this.maxThreads = maxThreads;
    this.idleTrigger = idleTrigger;
  
    this.idleTrigger.CheckIdle += CheckForIdleThread;
}

You have now dealt with three of the five issues you needed to consider when removing idle threads. However, there are still two outstanding:

  • How do you know if a thread is idle?
  • How do you stop a thread you want to remove from the pool?

To track the last time a thread is used, you need to record that somewhere. You also need to track whether the thread is currently in use in case it is performing a long-running action whose duration exceeded the idle timeout. Last, you need to encapsulate the mechanism to stop the thread. When a thread is idle it will be waiting on the blocking collection, so you can use a CancellationToken to break out of this wait if you need to stop the thread. You can create a ThreadControl class that encapsulates these pieces of information; it is shown in Listing 12-13.

Listing 12-13.  The ThreadControl Class

class ThreadControl
{
    private readonly CancellationTokenSource cts;
    public ThreadControl()
    {
        LastUsed = DateTime.UtcNow;
        cts = new CancellationTokenSource();
    }
    
    public DateTime LastUsed { get; private set; }
    public bool InUse { get; private set; }
    public CancellationToken CancellationToken { get { return cts.Token; } }
  
    public bool CancelIfIdle(TimeSpan idleTimeout)
    {
        if (!InUse && DateTime.UtcNow - LastUsed > idleTimeout)
        {
            cts.Cancel();
        }
  
        return cts.IsCancellationRequested;
    }
  
    public void SetNotInUse()
    {
        LastUsed = DateTime.UtcNow;
        InUse = false;
    }
  
    public void SetInUse()
    {
        InUse = true;
    }
}

You are going to need to associate the ThreadControl object with the thread so you will have to change the existing data structure that holds the threads from a List<Thread> to a dictionary. However, this is highly concurrent code now, so use a ConcurrentDictionary<ThreadControl, Thread> to simplify the scheduler. The running thread will also now be responsible for updating the ThreadControl's usage status, so the implementation of StartNewPoolThread needs to change to reflect this, as shown in Listing 12-14. The listing also shows the use of the ThreadControl CancellationToken to stop the idle thread when it is blocked waiting on the BlockingCollection.

Listing 12-14.  StartNewPoolThread Tracking Thread Usage

private void StartNewPoolThread()
{
    bool firstTaskExecuted = false;
    var thread = new Thread(o =>
    {
        try
        {
            var currentThreadControl = (ThreadControl) o;
            foreach (TaskWrapper taskWrapper in
                       taskQueue.GetConsumingEnumerable( currentThreadControl.CancellationToken))
            {
                currentThreadControl.SetInUse();
                // if this is the first task executed then the thread was already
                // marked in-use before it was created
                if (firstTaskExecuted)
                {
                    Interlocked.Increment(ref threadsInUse);
                }
                else
                {
                    firstTaskExecuted = true;
                }
                TryExecuteTask(task);
  
                currentThreadControl.SetNotInUse();
                Interlocked.Decrement(ref threadsInUse);
            }
        }
        catch (OperationCanceledException)
        {
            // if we haven't yet run a task when cancelled
            // then need to decrement the threadsInUse count as
            // it won't yet have been reset
            if (!firstTaskExecuted)
            {
                Interlocked.Decrement(ref threadsInUse);
            }
        }
    });
  
    thread.SetApartmentState(ApartmentState.STA);
  
    var threadControl = new ThreadControl();
    threads.TryAdd(threadControl, thread);
  
    thread.Start( threadControl);
}

Last, you need to implement CheckForIdleThread, the event handler wired up to the idle trigger. This method’s job is to look at each thread and verify whether it has now been idle longer than the timeout in the idle trigger and, if so, to stop the thread using the CancellationToken (this is handled internally by the ThreadControl CancelIfIdle method). You can see the implementation of CheckForIdleThread in Listing 12-15.

Listing 12-15.  Checking for Idle Threads

private void CheckForIdleThread(object sender, EventArgs e)
{
    TimeSpan timeoutToCheck = idleTrigger.IdleTimeout;
  
    foreach (ThreadControl threadControl in threads.Keys)
    {
        bool cancelled = threadControl.CancelIfIdle(timeoutToCheck);
        if (cancelled)
        {
            Thread thread;
            threads.TryRemove(threadControl, out thread);
            thread.Join(TimeSpan.FromSeconds(5));
        }
    }
}

This completes the functionality to remove threads from the pool of STA threads if they have been idle for an extended period of time. The ThreadControl class manages the concept of idle, the thread function tells the ThreadControl when the thread is in use, and the idle event handler triggers the examination of each thread to see whether it should be stopped.

Unit Testing Custom Schedulers

There are a number of issues in writing unit tests for custom schedulers:

  • You cannot call the significant scheduler methods directly as they are protected rather than public.
  • If you directly invoke the methods, say by reflection, the execution environment may be substantially different from the one the task infrastructure puts in place—so the tests may be meaningless.
  • Invoking the scheduler indirectly, via the task infrastructure, makes it difficult to verify the expected outcome.
  • The test must not complete before the scheduler as completed its work. Therefore, you require some way of synchronizing the behavior of the test and the scheduler.

However, there are big advantages to being able to create quality tests for schedulers as they have an inherent complexity. As you evolve their functionality it is comforting to be able to verify that you have not inadvertently changed unrelated behavior. So how do you write tests against an asynchronous component that you can’t directly invoke? There are three approaches you can take:

  1. Use constrained numbers of threads and synchronization primitives in your tasks to ensure execution order in the scheduler.
  2. Add members to the scheduler (e.g., events and properties) to provide insight into the scheduler’s behavior.
  3. Derive a class from the scheduler that allows you to verify behavior that only the task infrastructure would normally see.

These approaches should not be seen as mutually exclusive but rather as different tools you can use to elicit and verify different behavior. Let’s look at an example of each of them.

Controlling Execution Order with Synchronization Primitives

If you restrict the number of threads available to the scheduler, you can use synchronization primitives to control whether or not these threads are available to perform multiple tasks. For example, to verify that the STATaskScheduler does execute tasks on STA threads, you need to make sure that the task has executed before attempting that verification. Listing 12-16 shows the use of a ManualResetEventSlim to ensure that the ApartmentState of the task’s thread is captured before you perform the assertion. In this specific example you could also wait on the task itself. However, it is also a good idea to make sure that any wait is given a timeout. If you don’t provide a timeout, and the scheduler doesn’t actually execute the task, then the test would never end and the failure would be not be obvious.

Listing 12-16.  Contolling Execution Order with ManualResetEventSlim

 [TestMethod]
public void QueueTask_WhenQueuesFirstTask_ShouldExecuteTaskOnSTAThread()
{
    using (var scheduler = new STATaskScheduler( 1, new StubIdleTrigger()))
    {
        ApartmentState apartment = ApartmentState.MTA;
        var evt = new ManualResetEventSlim();
  
        Task t = new Task(() =>
                {
                    apartment = Thread.CurrentThread.GetApartmentState();
                    evt.Set();
                });
  
        t.Start(scheduler);
  
        if ( evt.Wait(1000))
        {
            Assert.AreEqual(ApartmentState.STA, apartment);
        }
        else
        {
            Assert.Fail();
        }
    }
}

A more complex situation is verifying that if the scheduler does not have an available thread when the task is enqueued, then it will wait for the thread to become available before executing the task. This test can be seen in Listing 12-17. The test uses multiple synchronization primitives to ensure control of the precise execution order: evt makes sure that you don’t decrement the currentConcurrency until the maxConcurrency has been captured, incrementDoneEvt ensures that the increment has taken place, and countdownDone ensures that both tasks have finished their work (you could also use Task.WaitAll for this purpose).

Listing 12-17.  Using Multiple Primitives for Controlling Execution

[TestMethod]
public void QueueTask_WhenQueuesTaskAndAllThreadsBusy_ShouldWaitUntilThreadFree()
{
    using (var scheduler = new STATaskScheduler( 1, new StubIdleTrigger()))
    {
        int maxConcurrency= 0;
        int currentConcurrency = 0;
        var evt = new ManualResetEventSlim();
        var countdownDone = new CountdownEvent(2);
        var incrementDoneEvt = new ManualResetEventSlim();
  
        Task t = new Task(() =>
            {
                maxConcurrency = Interlocked.Increment(ref currentConcurrency);
                incrementDoneEvt.Set();
                evt.Wait();
                Interlocked.Decrement(ref currentConcurrency);
                countdownDone.Signal();
            });
        Task t2 = new Task(() =>
            {
                maxConcurrency = Interlocked.Increment(ref currentConcurrency);
                incrementDoneEvt.Set();
                evt.Wait();
                Interlocked.Decrement(ref currentConcurrency);
                countdownDone.Signal();
            });
  
        t.Start(scheduler);
        t2.Start(scheduler);
  
        incrementDoneEvt.Wait();
        evt.Set();
  
        if (countdownDone.Wait(1000))
        {
            Assert.AreEqual(1, maxConcurrency);
        }
        else
        {
            Assert.Fail();
        }
    }
}

Adding Members to the Scheduler to Provide Insight

It may seem inappropriate to change the public interface of a class to be able to test it, but if testability is a priority then it is a useful technique. In this case the issue is also mitigated by the fact that only the task infrastructure will ever normally invoke methods on the scheduler. As an example, how do you verify that a task is not inlined when Wait is called on it from a non-STA thread? If you add an event to the STATaskScheduler that gets fired if the task is not inlined, then the test becomes fairly straightforward, as Listing 12-18 demonstrates.

Listing 12-18.  Adding a Member to Aid Testability

[TestMethod]
public void TryExecuteTaskInline_WhenTriggeredFromNonSTAThread_ShouldNotExecuteTask()
{
    var evt = new ManualResetEventSlim();
    var notInlinedEvent = new ManualResetEventSlim();
    int callingThread = -1;
    int executionThread = -1;
    using (var scheduler = new STATaskScheduler( 1))
    {
        scheduler.TaskNotInlined += (s, e) =>
            {
                notInlinedEvent.Set();
            };
  
        var t1 = new Task(() =>
        {
            evt.Wait();
        });
  
        var t2 = new Task(() =>
        {
            executionThread = Thread.CurrentThread.ManagedThreadId;
        });
  
        t1.Start(scheduler);
        t2.Start(scheduler);
  
        var staThread = new Thread(() =>
            {
                callingThread = Thread.CurrentThread.ManagedThreadId;
                t2.Wait();
            });
        staThread.SetApartmentState(ApartmentState.MTA);
        staThread.IsBackground = true;
        staThread.Start();
  
        notInlinedEvent.Wait();
                
        evt.Set();
  
        t2.Wait();
    }
  
    Assert.AreNotEqual(callingThread, executionThread);
}

Deriving a Testable Class from the Scheduler

One of the methods you need to test is GetScheduledTasks. The problem is that it is protected and so you cannot access it directly from the test code (unless you resort to reflection). You can, however, derive from your scheduler and expose the information you need from the derived class. Listing 12-19 shows a SpySTAScheduler class that derives from STATaskScheduler and allows you to see what the STATaskScheduler returns from GetScheduledTasks via its public GetTasks method.

Listing 12-19.  Deriving from the Scheduler to Expose Information

class SpySTAScheduler : STATaskScheduler
{
    public SpySTAScheduler(int maxThreads, IIdleTrigger idleTrigger)
        : base(maxThreads, idleTrigger)
    {
    }
  
    publicIEnumerable<Task> GetTasks()
    {
        return GetScheduledTasks();
    }
}

You can now use the SpySTAScheduler in a test to verify behavior of the STATaskScheduler as can be seen in Listing 12-20.

Listing 12-20.  Using the SpySTAScheduler in a Test

 [TestMethod]
public void GetScheduledTasks_WhenCalled_ShouldReturnAllTasksNotYetExecuting()
{
    IEnumerable<Task> tasks;
    Task t2;
    Task t3;
  
    using (var scheduler = new SpySTAScheduler(1, new StubIdleTrigger()))
    {
        var evt = new ManualResetEventSlim();
  
        Task t1 = new Task(() =>
        {
            evt.Set();
        });
  
        t1.Start( scheduler);
  
        t2 = new Task(() =>
        {
        });
  
        t2.Start( scheduler);
        t3 = new Task(() =>
        {
        });
  
        t3.Start( scheduler);
  
        tasks = scheduler.GetTasks();
  
        evt.Set();
    }
  
    Assert.IsNotNull(tasks.SingleOrDefault(t => t == t2));
    Assert.IsNotNull(tasks.SingleOrDefault(t => t == t3));
}

Summary

As you have seen, the pluggable model of task scheduling means that as new releases of the .NET framework occur, richer models of task scheduling can be introduced. However, more importantly, you can create customer task schedulers that meet your requirements. You saw that creating the essential behavior of a custom task scheduler is fairly straightforward. Adding features to make the scheduler more efficient in its use of threads does add some complexity, but is also very achievable. One of the biggest challenges with asynchronous code in general, and custom schedulers in particular, is writing effective unit tests. However, as described in this chapter, there are techniques you can use to alleviate some of the issues.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.12.140