While a synchronization context is a general-purpose pattern, out of the box, .NET only implements a single useful one: the Windows Forms synchronization context (there is also the default implementation that uses the .NET thread pool). As it turns out, the ability to automatically marshal calls to a custom synchronization context is one of the most powerful extensibility mechanisms in WCF.
There are two aspects to developing a custom service synchronization context: the
first is implementing a custom synchronization context, and the second is installing it or
even applying it declaratively on the service. ServiceModelEx
contains my ThreadPoolSynchronizer
class, defined
as:
public class ThreadPoolSynchronizer : SynchronizationContext,IDisposable { public ThreadPoolSynchronizer(uint poolSize); public ThreadPoolSynchronizer(uint poolSize,string poolName); public void Dispose( ); public void Close( ); public void Abort( ); protected Semaphore CallQueued {get;} }
Implementing a custom synchronization context has nothing to do with WCF and is therefore not discussed in this book, although the implementation code is available with ServiceModelEx.
ThreadPoolSynchronizer
marshals all calls to a
custom thread pool, where the calls are first queued up, then multiplexed on the available
threads. The size of the pool is provided as a construction parameter. If the pool is
maxed out, any calls that come in will remain pending in the queue until a thread is
available.
You can also provide a pool name (which will be the prefix of the name of each of the
threads in the pool). Disposing of or closing the ThreadPoolSynchronizer
kills all threads in the pool gracefully; that is, the
ThreadPoolSynchronizer
waits for the engaged threads
to complete their tasks. The Abort( )
method is an
ungraceful shutdown, as it terminates all threads abruptly.
The classic use for a custom thread pool is with a server application (such as a web
server or an email server) that needs to maximize its throughput by controlling the
underlying worker threads and their assignment. However, such usage is rare, since most
application developers do not write servers anymore. The real use of ThreadPoolSynchronizer
is as a stepping-stone to implement
other synchronization contexts, which are useful in their own right.
To associate your service with the custom thread pool, you can manually attach
ThreadPoolSynchronizer
to the thread opening the host
using the static SetSynchronizationContext( )
method of
SynchronizationContext
, as shown in Example 8-14.
Example 8-14. Using ThreadPoolSynchronizer
SynchronizationContext syncContext = new ThreadPoolSynchronizer(3);
SynchronizationContext.SetSynchronizationContext
(syncContext);
using(syncContext as IDisposable)
{
ServiceHost host = new ServiceHost(typeof(MyService));
host.Open( );
/* Some blocking operations */
host.Close( );
}
In Example 8-14, the thread pool will have three
threads. The service MyService
will have an affinity to
those three threads, and all calls to the service will be channeled to them, regardless of
the service concurrency mode or instancing mode, and across all endpoints and contracts
supported by the service. After closing the host, the example disposes of ThreadPoolSynchronizer
to shut down the threads in the
pool.
Note that a service executing in a custom thread pool is not thread-safe (unless the pool size is 1), so the preceding discussion of concurrency management still applies. The only difference is that now you control the threads.
The problem with Example 8-14 is that the service is at the mercy of the hosting code. If by design the service is required to execute in the pool, it would be better to apply the thread pool declaratively, as part of the service definition.
To that end, I wrote the ThreadPoolBehaviorAttribute
:
[AttributeUsage(AttributeTargets.Class)] public class ThreadPoolBehaviorAttribute : Attribute, IContractBehavior,IServiceBehavior { public ThreadPoolBehaviorAttribute(uint poolSize,Type serviceType); public ThreadPoolBehaviorAttribute(uint poolSize,Type serviceType, string poolName); }
You apply this attribute directly on the service, while providing the service type as a constructor parameter:
[ThreadPoolBehavior
(3
,typeof(MyService))] class MyService : IMyContract {...}
The attribute provides an instance of ThreadPoolSynchronizer
to the dispatchers of the service's endpoints. The
key in implementing the ThreadPoolBehavior
attribute
is knowing how and when to hook up the dispatchers with the synchronization context. The
ThreadPoolBehavior
attribute supports the special
WCF extensibility interface IContractBehavior
,
introduced in Chapter 5:
public interface IContractBehavior { void ApplyDispatchBehavior(ContractDescription description, ServiceEndpoint endpoint, DispatchRuntime dispatchRuntime); //More members }
When a service is decorated with an attribute that supports IContractBehavior
, after opening the host (but before forwarding calls to
the service), for each service endpoint WCF calls the ApplyDispatchBehavior( )
method and provides it with the DispatchRuntime
parameter, allowing you to affect an
individual endpoint dispatcher's runtime and set its synchronization context. Each
endpoint has its own dispatcher, and each dispatcher has its own synchronization
context, so the attribute is instantiated and ApplyDispatchBehavior( )
is called for each endpoint.
Example 8-15 lists most of the
implementation of ThreadPoolBehaviorAttribute
.
Example 8-15. Implementing ThreadPoolBehaviorAttribute
[AttributeUsage(AttributeTargets.Class)] public class ThreadPoolBehaviorAttribute : Attribute,IContractBehavior, IServiceBehavior { protected string PoolName {get;set;} protected uint PoolSize {get;set;} protected Type ServiceType {get;set;} public ThreadPoolBehaviorAttribute(uint poolSize,Type serviceType) : this(poolSize,serviceType,null) {} public ThreadPoolBehaviorAttribute(uint poolSize,Type serviceType, string poolName) { PoolName = poolName; ServiceType = serviceType; PoolSize = poolSize; } protectedvirtual
ThreadPoolSynchronizer ProvideSynchronizer( ) { if(ThreadPoolHelper.HasSynchronizer(ServiceType) == false) { return new ThreadPoolSynchronizer(PoolSize,PoolName); } else { return ThreadPoolHelper.GetSynchronizer(ServiceType); } } void IContractBehavior.ApplyDispatchBehavior(ContractDescription description, ServiceEndpoint endpoint, DispatchRuntime dispatchRuntime) { PoolName = PoolName ?? "Pool executing endpoints of " + ServiceType; lock(typeof(ThreadPoolHelper)) { ThreadPoolHelper.ApplyDispatchBehavior(ProvideSynchronizer( ), PoolSize,ServiceType,PoolName,dispatchRuntime); } } void IServiceBehavior.Validate(ServiceDescription description, ServiceHostBase serviceHostBase) { serviceHostBase.Closed += delegate { ThreadPoolHelper.CloseThreads(ServiceType); }; } //Rest of the implementation } public static class ThreadPoolHelper { static Dictionary<Type,ThreadPoolSynchronizer> m_Synchronizers = new Dictionary<Type,ThreadPoolSynchronizer>( ); [MethodImpl(MethodImplOptions.Synchronized)] internal static bool HasSynchronizer(Type type) { return m_Synchronizers.ContainsKey(type); } [MethodImpl(MethodImplOptions.Synchronized)] internal static ThreadPoolSynchronizer GetSynchronizer(Type type) { return m_Synchronizers[type]; } [MethodImpl(MethodImplOptions.Synchronized)] internal static void ApplyDispatchBehavior(ThreadPoolSynchronizer synchronizer, uint poolSize,Type type, string poolName, DispatchRuntime dispatchRuntime) { if(HasSynchronizer(type) == false) { m_Synchronizers[type] = synchronizer; }dispatchRuntime.SynchronizationContext = m_Synchronizers[type];
} [MethodImpl(MethodImplOptions.Synchronized)] public static void CloseThreads(Type type) { if(HasSynchronizer(type)) { m_Synchronizers[type].Dispose( ); m_Synchronizers.Remove(type); } } }
The constructors of the ThreadPoolBehavior
attribute save the provided service type and pool name. The name is simply passed to the
constructor of ThreadPoolSynchronizer
.
The ApplyDispatchBehavior( )
method in Example 8-15 uses the ??
null-coalescing operator (introduced in C# 2.0) to
assign a pool name if required. This expression:
PoolName = PoolName ?? "Pool executing endpoints of " + ServiceType;
is shorthand for:
if(PoolName == null) { PoolName = " Pool executing endpoints of " + ServiceType; }
It is a best practice to separate the implementation of a WCF custom behavior
attribute from the actual behavior: let the attribute merely decide on the sequence of
events, and have a helper class provide the actual behavior. Doing so enables the
behavior to be used separately (for example, by a custom host). This is why the ThreadPoolBehavior
attribute does not do much. It delegates
most of its work to a static helper class called ThreadPoolHelper
. ThreadPoolHelper
provides the HasSynchronizer( )
method, which
indicates whether the specified service type already has a synchronization context, and
the GetSynchronizer( )
method, which returns the
synchronization context associated with the type. The ThreadPoolBehavior
attribute uses these two methods in the virtual ProvideSynchronizer( )
method to ensure that it creates the
pool exactly once per service type. This check is required because ApplyDispatchBehavior( )
may be called multiple times (once
per endpoint). The ThreadPoolBehavior
attribute is
also a custom service behavior, because it implements IServiceBehavior
. The Validate( )
method
of IServiceBehavior
provides the service host
instance the ThreadPoolBehavior
attribute uses to
subscribe to the host's Closed
event, where it asks
ThreadPoolHelper
to terminate all the threads in
the pool by calling ThreadPoolHelper.CloseThreads(
)
.
ThreadPoolHelper
associates all dispatchers of
all endpoints of that service type with the same instance of ThreadPoolSynchronizer
. This ensures that all calls are routed to the same
pool. ThreadPoolHelper
has to be able to map a
service type to a particular ThreadPoolSynchronizer
,
so it declares a static dictionary called m_Synchronizers
that uses service types as keys and ThreadPoolSynchronizer
instances as values.
In ApplyDispatchBehavior( )
, ThreadPoolHelper
checks to see whether m_Synchronizers
already contains the provided service type.
If the type is not found, ThreadPoolHelper
adds the
provided ThreadPoolSynchronizer
to m_Synchronizers
, associating it with the service
type.
The DispatchRuntime
class provides the SynchronizationContext
property ThreadPoolHelper
uses to assign a synchronization context for the
dispatcher:
public sealed class DispatchRuntime { public SynchronizationContext SynchronizationContext {get;set;} //More members }
Before making the assignment, ThreadPoolHelper
verifies that the dispatcher has no other synchronization context, since that would
indicate some unresolved conflict. After that, it simply assigns the ThreadPoolSynchronizer
instance to the dispatcher:
dispatchRuntime.SynchronizationContext = m_Synchronizers[type];
This single line is all that is required to have WCF use the custom synchronization
context from now on. In the CloseThreads( )
method,
ThreadPoolHelper
looks up the ThreadPoolSynchronizer
instance in the dictionary and
disposes of it (thus gracefully terminating all the worker threads in the pool).
ThreadPoolHelper
also verifies that the provided
pool size value does not exceed the maximum concurrent calls value of the dispatcher's
throttle (this is not shown in Example 8-15).
A pool size of 1 will in effect create an affinity between a particular thread and all
service calls, regardless of the service's concurrency and instancing modes. This is
particularly useful if the service is required not merely to update some UI but to also
create a UI (for example, creating a pop-up window and then periodically showing, hiding,
and updating it). Having created the window, the service must ensure that the creating
thread is used to access and update it. Thread affinity is also required for a service
that accesses or creates resources that use the TLS. To formalize such requirements I
created the specialized AffinitySynchronizer
class,
implemented as:
public class AffinitySynchronizer : ThreadPoolSynchronizer
{
public AffinitySynchronizer( ) : this("AffinitySynchronizer Worker Thread")
{}
public AffinitySynchronizer(string threadName): base(1
,threadName)
{}
}
While you can install AffinitySynchronizer
, as
shown in Example 8-14, if by design the service is
required to always execute on the same thread it is better not to be at the mercy of the
host and the thread that happens to open it. Instead, use my ThreadAffinityBehaviorAttribute
:
[ThreadAffinityBehavior(typeof(MyService))] class MyService : IMyContract {...}
ThreadAffinityBehaviorAttribute
is a specialization
of ThreadPoolBehaviorAttribute
that hardcodes the pool
size as 1, as shown in Example 8-16.
Example 8-16. Implementing ThreadAffinityBehaviorAttribute
[AttributeUsage(AttributeTargets.Class)]
public class ThreadAffinityBehaviorAttribute : ThreadPoolBehaviorAttribute
{
public ThreadAffinityBehaviorAttribute(Type serviceType) :
this(serviceType,"Affinity Worker Thread")
{}
public ThreadAffinityBehaviorAttribute(Type serviceType,string threadName) :
base(1
,serviceType,threadName)
{}
}
When relying on thread affinity all service instances are always thread-safe, since only a single thread (and the same thread, at that) can access them.
When the service is configured with ConcurrencyMode.Single
, it gains no additional thread safety because the
service instance is single-threaded anyway. You do get double queuing of concurrent calls,
though: all concurrent calls to the service are first queued in the lock's queue and then
dispatched to the single thread in the pool one at a time. With ConcurrencyMode.Multiple
, calls are dispatched to the single thread as fast
as they arrive and are then queued up to be invoked later, in order and never
concurrently. Finally, with ConcurrencyMode.Reentrant
,
the service is, of course, not reentrant, because the incoming reentering call will be
queued up and a deadlock will occur while the single thread is blocked on the callout. It
is therefore best to use the default of ConcurrencyMode.Single
when relying on thread affinity.
If the affinity to a particular synchronization context is a host decision, you can streamline the code in Example 8-14 by encapsulating the installation of the synchronization context with extension methods. For example, the use of thread affinity is such a socialized case, you could define the following extension methods:
public static class HostThreadAffinity { public static void SetThreadAffinity(this ServiceHost host,string threadName); public static void SetThreadAffinity(this ServiceHost host); }
SetThreadAffinity( )
works equally well on
ServiceHost
and my ServiceHost<T>
:
ServiceHost<MyService> host = new ServiceHost<MyService>( );
host.SetThreadAffinity( )
;
host.Open( );
Example 8-17 lists the implementation of the
SetThreadAffinity( )
methods.
Example 8-17. Adding thread affinity to the host
public static class HostThreadAffinity { public static void SetThreadAffinity(this ServiceHost host,string threadName) { if(host.State == CommunicationState.Opened) { throw new InvalidOperationException("Host is already opened"); } AffinitySynchronizer affinitySynchronizer = new AffinitySynchronizer(threadName); SynchronizationContext.SetSynchronizationContext(affinitySynchronizer); host.Closing += delegate { using(affinitySynchronizer); }; } public static void SetThreadAffinity(this ServiceHost host) { SetThreadAffinity(host,"Executing all endpoints of " + host.Description.ServiceType); } }
HostThreadAffinity
offers two versions of
SetThreadAffinity( )
: the parameterized version
takes the thread name to provide for AffinitySynchronizer
's worker thread, while the parameterless version calls
the other SetThreadAffinity( )
method, specifying a
thread name inferred from the hosted service type (such as "Executing all endpoints of
MyService"). SetThreadAffinity( )
first checks that
the host has not yet been opened, because you can only attach a synchronization context
before the host is opened. If the host has not been opened, SetThreadAffinity( )
constructs a new AffinitySynchronizer
, providing it with the thread name to use, and
attaches it to the current thread. Finally, SetThreadAffinity(
)
subscribes to the host's Closing
event
in order to call Dispose( )
on the AffinitySynchronizer
, to shut down its worker thread. Since
the AffinitySynchronizer
member can be null
if no one calls SetThreadAffinity( )
, OnClosing( )
uses
the using
statement, which internally checks for
null
assignment before calling Dispose( )
.
By default, all calls to your WCF service will be processed in the order in which they
arrive. This is true both if you use the I/O completion thread pool or a custom thread
pool. Normally, this is exactly what you want. But what if some calls have higher priority
and you want to process them as soon as they arrive, rather than in order? Even worse,
when such calls arrive, what if the load on your service is such that the underlying
service resources are exhausted? What if the throttle is maxed out? In these cases, your
higher-priority calls will be queued just like all the other calls, waiting for the
service or its resources to become available. Synchronization contexts offer an elegant
solution to this problem: you can assign a priority to each call and have the
synchronization context sort the calls as they arrive before dispatching them to the
thread pool for execution. This is exactly what my PrioritySynchronizer
class does:
public enum CallPriority { Low, Normal, High } public class PrioritySynchronizer : ThreadPoolSynchronizer { public PrioritySynchronizer(uint poolSize); public PrioritySynchronizer(uint poolSize,string poolName); public static CallPriority Priority {get;set;} }
PrioritySynchronizer
derives from ThreadPoolSynchronizer
and adds the sorting just mentioned.
Since the Send( )
and Post(
)
methods of SynchronizationContext
do not
take a priority parameter, the client of PrioritySynchronizer
has two ways of passing the priority of the call: via
the Priority
property, which stores the priority (a
value of the enum type CallPriority
) in the TLS of the
calling thread, or via the message headers. If unspecified, Priority
defaults to CallPriority.Normal
.
In addition to the PrioritySynchronizer
class, I
also provide the matching PriorityCallsBehaviorAttribute
, shown in Example 8-18.
Example 8-18. Implementing PriorityCallsBehaviorAttribute
[AttributeUsage(AttributeTargets.Class)] public class PriorityCallsBehaviorAttribute : ThreadPoolBehaviorAttribute { public PriorityCallsBehaviorAttribute(uint poolSize,Type serviceType) : this(poolSize,serviceType,null) {} public PriorityCallsBehaviorAttribute(uint poolSize,Type serviceType, string poolName) : base(poolSize,serviceType,poolName) {} protectedoverride
ThreadPoolSynchronizer ProvideSynchronizer( ) { if(ThreadPoolHelper.HasSynchronizer(ServiceType) == false) { returnnew PrioritySynchronizer
(PoolSize,PoolName); } else { return ThreadPoolHelper.GetSynchronizer(ServiceType); } } }
Using the PriorityCallsBehavior
attribute is
straightforward:
[PriorityCallsBehavior(3,typeof(MyService))] class MyService : IMyContract {...}
PriorityCallsBehaviorAttribute
overrides ProvideSynchronizer( )
and provides an instance of PrioritySynchronizer
instead of ThreadPoolSynchronizer
. Because PrioritySynchronizer
derives from ThreadPoolSynchronizer
, this is transparent as far as ThreadPoolHelper
is concerned.
The real challenge in implementing and supporting priority processing is providing the
call priority from the client to the service, and ultimately to PrioritySynchronizer
. Using the Priority
property of PrioritySynchronizer
is useful only for
non-WCF clients that interact directly with the synchronization context; it is of no use
for a WCF client, whose thread is never used to access the service. While you could
provide the priority as an explicit parameter in every method, I wanted a generic
mechanism that can be applied on any contract and service. To achieve that goal you have
to pass the priority of the call out-of-band, via the message headers, using the
techniques described in Appendix B. Appendix B explains in detail the use of the incoming and outgoing
headers, including augmenting WCF with general-purpose management of extraneous
information sent from the client to the service. In effect, I provide a generic yet
type-safe and application-specific custom context via my GenericContext<T>
class, available in
ServiceModelEx:
[DataContract] public class GenericContext<T> { [DataMember] public readonly T Value; public GenericContext( ); public GenericContext(T value); public static GenericContext<T> Current {get;set;} }
Literally any data contract (or serializable) type can be used for the type parameter
in the custom context, including of course the CallPriority
enum.
On the service side, any party can read the value out of the custom headers:
CallPriority priority = GenericContext<CallPriority>.Current.Value;
This is exactly what PrioritySynchronizer
does when
looking for the call priority. It expects the client to provide the priority either in the
TLS (via the Priority
property) or in the form of a
custom context that stores the priority in the message headers.
The client can use my HeaderClientBase<T,H>
proxy class (also discussed in Appendix B) to pass the
priority to the service in the message headers, or, even better, define a general-purpose
priority-enabled proxy class, PriorityClientBase<T>
, shown in Example 8-19.
Example 8-19. Defining PriorityClientBase<T>
public abstract partial class PriorityClientBase<T> : HeaderClientBase<T,CallPriority> where T : class { public PriorityClientBase( ) : this(PrioritySynchronizer.Priority) {} public PriorityClientBase(string endpointName) : this(PrioritySynchronizer.Priority,endpointName) {} public PriorityClientBase(Binding binding,EndpointAddress remoteAddress) : this(PrioritySynchronizer.Priority,binding,remoteAddress) {} public PriorityClientBase(CallPriority priority) : base(priority) {} public PriorityClientBase(CallPriority priority,string endpointName) : base(priority,endpointConfigurationName) {} public PriorityClientBase(CallPriority priority,Binding binding, EndpointAddress remoteAddress) : base(priority,binding,remoteAddress) {} /* More constructors */ }
PriorityClientBase<T>
hardcodes the use of
CallPriority
for the type parameter H
. PriorityClientBase<T>
defaults to reading the priority from the TLS
(yielding CallPriority.Normal
when no priority is
found), so it can be used like any other proxy class. With very minor changes to your
existing proxy classes, you can now add priority-processing support:
class MyContractClient : PriorityClientBase<IMyContract>,IMyContract { //Reads priority from TLS public MyContractClient( ) {} public MyContractClient(CallPriority priority) : base(priority) {} public void MyMethod( ) { Channel.MyMethod( ); } } MyContractClient proxy = new MyContractClient(CallPriority.High); proxy.MyMethod( );
[6] I first presented my technique for priority processing of WCF calls in my article "Synchronization Contexts in WCF" (MSDN Magazine, November 2007).
18.217.254.118