Custom Service Synchronization Contexts

While a synchronization context is a general-purpose pattern, out of the box, .NET only implements a single useful one: the Windows Forms synchronization context (there is also the default implementation that uses the .NET thread pool). As it turns out, the ability to automatically marshal calls to a custom synchronization context is one of the most powerful extensibility mechanisms in WCF.

The Thread Pool Synchronizer

There are two aspects to developing a custom service synchronization context: the first is implementing a custom synchronization context, and the second is installing it or even applying it declaratively on the service. ServiceModelEx contains my ThreadPoolSynchronizer class, defined as:

public class ThreadPoolSynchronizer : SynchronizationContext,IDisposable
{
   public ThreadPoolSynchronizer(uint poolSize);
   public ThreadPoolSynchronizer(uint poolSize,string poolName);

   public void Dispose(  );
   public void Close(  );
   public void Abort(  );

   protected Semaphore CallQueued
   {get;}
}

Implementing a custom synchronization context has nothing to do with WCF and is therefore not discussed in this book, although the implementation code is available with ServiceModelEx.

ThreadPoolSynchronizer marshals all calls to a custom thread pool, where the calls are first queued up, then multiplexed on the available threads. The size of the pool is provided as a construction parameter. If the pool is maxed out, any calls that come in will remain pending in the queue until a thread is available.

You can also provide a pool name (which will be the prefix of the name of each of the threads in the pool). Disposing of or closing the ThreadPoolSynchronizer kills all threads in the pool gracefully; that is, the ThreadPoolSynchronizer waits for the engaged threads to complete their tasks. The Abort( ) method is an ungraceful shutdown, as it terminates all threads abruptly.

The classic use for a custom thread pool is with a server application (such as a web server or an email server) that needs to maximize its throughput by controlling the underlying worker threads and their assignment. However, such usage is rare, since most application developers do not write servers anymore. The real use of ThreadPoolSynchronizer is as a stepping-stone to implement other synchronization contexts, which are useful in their own right.

To associate your service with the custom thread pool, you can manually attach ThreadPoolSynchronizer to the thread opening the host using the static SetSynchronizationContext( ) method of SynchronizationContext, as shown in Example 8-14.

Example 8-14. Using ThreadPoolSynchronizer

SynchronizationContext syncContext = new ThreadPoolSynchronizer(3);

SynchronizationContext.SetSynchronizationContext(syncContext);

using(syncContext as IDisposable)
{
   ServiceHost host = new ServiceHost(typeof(MyService));
   host.Open(  );
   /* Some blocking operations */

   host.Close(  );
}

In Example 8-14, the thread pool will have three threads. The service MyService will have an affinity to those three threads, and all calls to the service will be channeled to them, regardless of the service concurrency mode or instancing mode, and across all endpoints and contracts supported by the service. After closing the host, the example disposes of ThreadPoolSynchronizer to shut down the threads in the pool.

Note that a service executing in a custom thread pool is not thread-safe (unless the pool size is 1), so the preceding discussion of concurrency management still applies. The only difference is that now you control the threads.

Declaratively attaching a custom synchronization context

The problem with Example 8-14 is that the service is at the mercy of the hosting code. If by design the service is required to execute in the pool, it would be better to apply the thread pool declaratively, as part of the service definition.

To that end, I wrote the ThreadPoolBehaviorAttribute:

[AttributeUsage(AttributeTargets.Class)]
public class ThreadPoolBehaviorAttribute : Attribute,
                                                 IContractBehavior,IServiceBehavior
{
   public ThreadPoolBehaviorAttribute(uint poolSize,Type serviceType);
   public ThreadPoolBehaviorAttribute(uint poolSize,Type serviceType,
                                      string poolName);
}

You apply this attribute directly on the service, while providing the service type as a constructor parameter:

[ThreadPoolBehavior(3,typeof(MyService))]
class MyService : IMyContract
{...}

The attribute provides an instance of ThreadPoolSynchronizer to the dispatchers of the service's endpoints. The key in implementing the ThreadPoolBehavior attribute is knowing how and when to hook up the dispatchers with the synchronization context. The ThreadPoolBehavior attribute supports the special WCF extensibility interface IContractBehavior, introduced in Chapter 5:

public interface IContractBehavior
{
   void ApplyDispatchBehavior(ContractDescription description,
                              ServiceEndpoint endpoint,
                              DispatchRuntime dispatchRuntime);
   //More members
}

When a service is decorated with an attribute that supports IContractBehavior, after opening the host (but before forwarding calls to the service), for each service endpoint WCF calls the ApplyDispatchBehavior( ) method and provides it with the DispatchRuntime parameter, allowing you to affect an individual endpoint dispatcher's runtime and set its synchronization context. Each endpoint has its own dispatcher, and each dispatcher has its own synchronization context, so the attribute is instantiated and ApplyDispatchBehavior( ) is called for each endpoint.

Example 8-15 lists most of the implementation of ThreadPoolBehaviorAttribute.

Example 8-15. Implementing ThreadPoolBehaviorAttribute

[AttributeUsage(AttributeTargets.Class)]
public class ThreadPoolBehaviorAttribute : Attribute,IContractBehavior,
                                                                   IServiceBehavior
{
   protected string PoolName
   {get;set;}
   protected uint PoolSize
   {get;set;}
   protected Type ServiceType
   {get;set;}

   public ThreadPoolBehaviorAttribute(uint poolSize,Type serviceType) :
                                                    this(poolSize,serviceType,null)
   {}
   public ThreadPoolBehaviorAttribute(uint poolSize,Type serviceType,
                                      string poolName)
   {
      PoolName    = poolName;
      ServiceType = serviceType;
      PoolSize    = poolSize;
   }
   protected virtual ThreadPoolSynchronizer ProvideSynchronizer(  )
   {
      if(ThreadPoolHelper.HasSynchronizer(ServiceType) == false)
      {
         return new ThreadPoolSynchronizer(PoolSize,PoolName);
      }
      else
      {
         return ThreadPoolHelper.GetSynchronizer(ServiceType);
      }
   }

   void IContractBehavior.ApplyDispatchBehavior(ContractDescription description,
                                                ServiceEndpoint endpoint,
                                                DispatchRuntime dispatchRuntime)
   {
      PoolName = PoolName ?? "Pool executing endpoints of " + ServiceType;

      lock(typeof(ThreadPoolHelper))
      {
         ThreadPoolHelper.ApplyDispatchBehavior(ProvideSynchronizer(  ),
                                    PoolSize,ServiceType,PoolName,dispatchRuntime);
      }
   }
   void IServiceBehavior.Validate(ServiceDescription description,
                                  ServiceHostBase serviceHostBase)
   {
      serviceHostBase.Closed += delegate
                                {
                                   ThreadPoolHelper.CloseThreads(ServiceType);
                                };
   }
   //Rest of the implementation
}
public static class ThreadPoolHelper
{
   static Dictionary<Type,ThreadPoolSynchronizer> m_Synchronizers =
                                     new Dictionary<Type,ThreadPoolSynchronizer>(  );

   [MethodImpl(MethodImplOptions.Synchronized)]
   internal static bool HasSynchronizer(Type type)
   {
      return m_Synchronizers.ContainsKey(type);
   }

   [MethodImpl(MethodImplOptions.Synchronized)]
   internal static ThreadPoolSynchronizer GetSynchronizer(Type type)
   {
      return m_Synchronizers[type];
   }
   [MethodImpl(MethodImplOptions.Synchronized)]
   internal static void ApplyDispatchBehavior(ThreadPoolSynchronizer synchronizer,
                                              uint poolSize,Type type,
                                              string poolName,
                                              DispatchRuntime dispatchRuntime)
   {
      if(HasSynchronizer(type) == false)
      {
         m_Synchronizers[type] = synchronizer;
      }
      dispatchRuntime.SynchronizationContext = m_Synchronizers[type];
   }
   [MethodImpl(MethodImplOptions.Synchronized)]
   public static void CloseThreads(Type type)
   {
      if(HasSynchronizer(type))
      {
         m_Synchronizers[type].Dispose(  );
         m_Synchronizers.Remove(type);
      }
   }
}

The constructors of the ThreadPoolBehavior attribute save the provided service type and pool name. The name is simply passed to the constructor of ThreadPoolSynchronizer.

Tip

The ApplyDispatchBehavior( ) method in Example 8-15 uses the ?? null-coalescing operator (introduced in C# 2.0) to assign a pool name if required. This expression:

PoolName = PoolName ??
        "Pool executing endpoints of " + ServiceType;

is shorthand for:

if(PoolName == null)
{
   PoolName = " Pool executing endpoints of " +
                                         ServiceType;
}

It is a best practice to separate the implementation of a WCF custom behavior attribute from the actual behavior: let the attribute merely decide on the sequence of events, and have a helper class provide the actual behavior. Doing so enables the behavior to be used separately (for example, by a custom host). This is why the ThreadPoolBehavior attribute does not do much. It delegates most of its work to a static helper class called ThreadPoolHelper. ThreadPoolHelper provides the HasSynchronizer( ) method, which indicates whether the specified service type already has a synchronization context, and the GetSynchronizer( ) method, which returns the synchronization context associated with the type. The ThreadPoolBehavior attribute uses these two methods in the virtual ProvideSynchronizer( ) method to ensure that it creates the pool exactly once per service type. This check is required because ApplyDispatchBehavior( ) may be called multiple times (once per endpoint). The ThreadPoolBehavior attribute is also a custom service behavior, because it implements IServiceBehavior. The Validate( ) method of IServiceBehavior provides the service host instance the ThreadPoolBehavior attribute uses to subscribe to the host's Closed event, where it asks ThreadPoolHelper to terminate all the threads in the pool by calling ThreadPoolHelper.CloseThreads( ).

ThreadPoolHelper associates all dispatchers of all endpoints of that service type with the same instance of ThreadPoolSynchronizer. This ensures that all calls are routed to the same pool. ThreadPoolHelper has to be able to map a service type to a particular ThreadPoolSynchronizer, so it declares a static dictionary called m_Synchronizers that uses service types as keys and ThreadPoolSynchronizer instances as values.

In ApplyDispatchBehavior( ), ThreadPoolHelper checks to see whether m_Synchronizers already contains the provided service type. If the type is not found, ThreadPoolHelper adds the provided ThreadPoolSynchronizer to m_Synchronizers, associating it with the service type.

The DispatchRuntime class provides the SynchronizationContext property ThreadPoolHelper uses to assign a synchronization context for the dispatcher:

public sealed class DispatchRuntime
{
   public SynchronizationContext SynchronizationContext
   {get;set;}
   //More members
}

Before making the assignment, ThreadPoolHelper verifies that the dispatcher has no other synchronization context, since that would indicate some unresolved conflict. After that, it simply assigns the ThreadPoolSynchronizer instance to the dispatcher:

dispatchRuntime.SynchronizationContext = m_Synchronizers[type];

This single line is all that is required to have WCF use the custom synchronization context from now on. In the CloseThreads( ) method, ThreadPoolHelper looks up the ThreadPoolSynchronizer instance in the dictionary and disposes of it (thus gracefully terminating all the worker threads in the pool). ThreadPoolHelper also verifies that the provided pool size value does not exceed the maximum concurrent calls value of the dispatcher's throttle (this is not shown in Example 8-15).

Thread Affinity

A pool size of 1 will in effect create an affinity between a particular thread and all service calls, regardless of the service's concurrency and instancing modes. This is particularly useful if the service is required not merely to update some UI but to also create a UI (for example, creating a pop-up window and then periodically showing, hiding, and updating it). Having created the window, the service must ensure that the creating thread is used to access and update it. Thread affinity is also required for a service that accesses or creates resources that use the TLS. To formalize such requirements I created the specialized AffinitySynchronizer class, implemented as:

public class AffinitySynchronizer : ThreadPoolSynchronizer
{
   public AffinitySynchronizer(  ) : this("AffinitySynchronizer Worker Thread")
   {}
   public AffinitySynchronizer(string threadName): base(1,threadName)
   {}
}

While you can install AffinitySynchronizer, as shown in Example 8-14, if by design the service is required to always execute on the same thread it is better not to be at the mercy of the host and the thread that happens to open it. Instead, use my ThreadAffinityBehaviorAttribute:

[ThreadAffinityBehavior(typeof(MyService))]
class MyService : IMyContract
{...}

ThreadAffinityBehaviorAttribute is a specialization of ThreadPoolBehaviorAttribute that hardcodes the pool size as 1, as shown in Example 8-16.

Example 8-16. Implementing ThreadAffinityBehaviorAttribute

[AttributeUsage(AttributeTargets.Class)]
public class ThreadAffinityBehaviorAttribute : ThreadPoolBehaviorAttribute
{
   public ThreadAffinityBehaviorAttribute(Type serviceType) :
                                         this(serviceType,"Affinity Worker Thread")
   {}

   public ThreadAffinityBehaviorAttribute(Type serviceType,string threadName) :
                                                     base(1,serviceType,threadName)
   {}
}

When relying on thread affinity all service instances are always thread-safe, since only a single thread (and the same thread, at that) can access them.

When the service is configured with ConcurrencyMode.Single, it gains no additional thread safety because the service instance is single-threaded anyway. You do get double queuing of concurrent calls, though: all concurrent calls to the service are first queued in the lock's queue and then dispatched to the single thread in the pool one at a time. With ConcurrencyMode.Multiple, calls are dispatched to the single thread as fast as they arrive and are then queued up to be invoked later, in order and never concurrently. Finally, with ConcurrencyMode.Reentrant, the service is, of course, not reentrant, because the incoming reentering call will be queued up and a deadlock will occur while the single thread is blocked on the callout. It is therefore best to use the default of ConcurrencyMode.Single when relying on thread affinity.

The host-installed synchronization context

If the affinity to a particular synchronization context is a host decision, you can streamline the code in Example 8-14 by encapsulating the installation of the synchronization context with extension methods. For example, the use of thread affinity is such a socialized case, you could define the following extension methods:

public static class HostThreadAffinity
{
   public static void SetThreadAffinity(this ServiceHost host,string threadName);
   public static void SetThreadAffinity(this ServiceHost host);
}

SetThreadAffinity( ) works equally well on ServiceHost and my ServiceHost<T>:

ServiceHost<MyService> host = new ServiceHost<MyService>(  );
host.SetThreadAffinity(  );

host.Open(  );

Example 8-17 lists the implementation of the SetThreadAffinity( ) methods.

Example 8-17. Adding thread affinity to the host

public static class HostThreadAffinity
{
   public static void SetThreadAffinity(this ServiceHost host,string threadName)
   {
      if(host.State == CommunicationState.Opened)
      {
         throw new InvalidOperationException("Host is already opened");
      }

      AffinitySynchronizer affinitySynchronizer =
                                             new AffinitySynchronizer(threadName);

      SynchronizationContext.SetSynchronizationContext(affinitySynchronizer);

      host.Closing += delegate
                      {
                         using(affinitySynchronizer);
                      };
   }
   public static void SetThreadAffinity(this ServiceHost host)
   {
      SetThreadAffinity(host,"Executing all endpoints of " +
                                                    host.Description.ServiceType);
   }
}

HostThreadAffinity offers two versions of SetThreadAffinity( ): the parameterized version takes the thread name to provide for AffinitySynchronizer's worker thread, while the parameterless version calls the other SetThreadAffinity( ) method, specifying a thread name inferred from the hosted service type (such as "Executing all endpoints of MyService"). SetThreadAffinity( ) first checks that the host has not yet been opened, because you can only attach a synchronization context before the host is opened. If the host has not been opened, SetThreadAffinity( ) constructs a new AffinitySynchronizer, providing it with the thread name to use, and attaches it to the current thread. Finally, SetThreadAffinity( ) subscribes to the host's Closing event in order to call Dispose( ) on the AffinitySynchronizer, to shut down its worker thread. Since the AffinitySynchronizer member can be null if no one calls SetThreadAffinity( ), OnClosing( ) uses the using statement, which internally checks for null assignment before calling Dispose( ).

Priority Processing[6]

By default, all calls to your WCF service will be processed in the order in which they arrive. This is true both if you use the I/O completion thread pool or a custom thread pool. Normally, this is exactly what you want. But what if some calls have higher priority and you want to process them as soon as they arrive, rather than in order? Even worse, when such calls arrive, what if the load on your service is such that the underlying service resources are exhausted? What if the throttle is maxed out? In these cases, your higher-priority calls will be queued just like all the other calls, waiting for the service or its resources to become available. Synchronization contexts offer an elegant solution to this problem: you can assign a priority to each call and have the synchronization context sort the calls as they arrive before dispatching them to the thread pool for execution. This is exactly what my PrioritySynchronizer class does:

public enum CallPriority
{
   Low,
   Normal,
   High
}
public class PrioritySynchronizer : ThreadPoolSynchronizer
{
   public PrioritySynchronizer(uint poolSize);
   public PrioritySynchronizer(uint poolSize,string poolName);

   public static CallPriority Priority
   {get;set;}
}

PrioritySynchronizer derives from ThreadPoolSynchronizer and adds the sorting just mentioned. Since the Send( ) and Post( ) methods of SynchronizationContext do not take a priority parameter, the client of PrioritySynchronizer has two ways of passing the priority of the call: via the Priority property, which stores the priority (a value of the enum type CallPriority) in the TLS of the calling thread, or via the message headers. If unspecified, Priority defaults to CallPriority.Normal.

In addition to the PrioritySynchronizer class, I also provide the matching PriorityCallsBehaviorAttribute, shown in Example 8-18.

Example 8-18. Implementing PriorityCallsBehaviorAttribute

[AttributeUsage(AttributeTargets.Class)]
public class PriorityCallsBehaviorAttribute : ThreadPoolBehaviorAttribute
{
   public PriorityCallsBehaviorAttribute(uint poolSize,Type serviceType) :
                                                    this(poolSize,serviceType,null)
   {}
   public PriorityCallsBehaviorAttribute(uint poolSize,Type serviceType,
                             string poolName) : base(poolSize,serviceType,poolName)
   {}
   protected override ThreadPoolSynchronizer ProvideSynchronizer(  )
   {
      if(ThreadPoolHelper.HasSynchronizer(ServiceType) == false)
      {
         return new PrioritySynchronizer(PoolSize,PoolName);
      }
      else
      {
         return ThreadPoolHelper.GetSynchronizer(ServiceType);
      }
   }
}

Using the PriorityCallsBehavior attribute is straightforward:

[PriorityCallsBehavior(3,typeof(MyService))]
class MyService : IMyContract
{...}

PriorityCallsBehaviorAttribute overrides ProvideSynchronizer( ) and provides an instance of PrioritySynchronizer instead of ThreadPoolSynchronizer. Because PrioritySynchronizer derives from ThreadPoolSynchronizer, this is transparent as far as ThreadPoolHelper is concerned.

The real challenge in implementing and supporting priority processing is providing the call priority from the client to the service, and ultimately to PrioritySynchronizer. Using the Priority property of PrioritySynchronizer is useful only for non-WCF clients that interact directly with the synchronization context; it is of no use for a WCF client, whose thread is never used to access the service. While you could provide the priority as an explicit parameter in every method, I wanted a generic mechanism that can be applied on any contract and service. To achieve that goal you have to pass the priority of the call out-of-band, via the message headers, using the techniques described in Appendix B. Appendix B explains in detail the use of the incoming and outgoing headers, including augmenting WCF with general-purpose management of extraneous information sent from the client to the service. In effect, I provide a generic yet type-safe and application-specific custom context via my GenericContext<T> class, available in ServiceModelEx:

[DataContract]
public class GenericContext<T>
{
   [DataMember]
   public readonly T Value;

   public GenericContext(  );
   public GenericContext(T value);
   public static GenericContext<T> Current
   {get;set;}
}

Literally any data contract (or serializable) type can be used for the type parameter in the custom context, including of course the CallPriority enum.

On the service side, any party can read the value out of the custom headers:

CallPriority priority = GenericContext<CallPriority>.Current.Value;

This is exactly what PrioritySynchronizer does when looking for the call priority. It expects the client to provide the priority either in the TLS (via the Priority property) or in the form of a custom context that stores the priority in the message headers.

The client can use my HeaderClientBase<T,H> proxy class (also discussed in Appendix B) to pass the priority to the service in the message headers, or, even better, define a general-purpose priority-enabled proxy class, PriorityClientBase<T>, shown in Example 8-19.

Example 8-19. Defining PriorityClientBase<T>

public abstract partial class PriorityClientBase<T> :
                                   HeaderClientBase<T,CallPriority> where T : class
{
   public PriorityClientBase(  ) : this(PrioritySynchronizer.Priority)
   {}

   public PriorityClientBase(string endpointName) :
                                   this(PrioritySynchronizer.Priority,endpointName)
   {}

   public PriorityClientBase(Binding binding,EndpointAddress remoteAddress) :
                          this(PrioritySynchronizer.Priority,binding,remoteAddress)
   {}

   public PriorityClientBase(CallPriority priority) : base(priority)
   {}

   public PriorityClientBase(CallPriority priority,string endpointName) :
                                           base(priority,endpointConfigurationName)
   {}

   public PriorityClientBase(CallPriority priority,Binding binding,
              EndpointAddress remoteAddress) : base(priority,binding,remoteAddress)
   {}
   /* More constructors */
}

PriorityClientBase<T> hardcodes the use of CallPriority for the type parameter H. PriorityClientBase<T> defaults to reading the priority from the TLS (yielding CallPriority.Normal when no priority is found), so it can be used like any other proxy class. With very minor changes to your existing proxy classes, you can now add priority-processing support:

class MyContractClient : PriorityClientBase<IMyContract>,IMyContract
{
   //Reads priority from TLS
   public MyContractClient(  )
   {}

   public MyContractClient(CallPriority priority) : base(priority)
   {}
   public void MyMethod(  )
   {
      Channel.MyMethod(  );
   }
}

MyContractClient proxy = new MyContractClient(CallPriority.High);
proxy.MyMethod(  );


[6] I first presented my technique for priority processing of WCF calls in my article "Synchronization Contexts in WCF" (MSDN Magazine, November 2007).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.55.193