ServiceModelEx contains a simple-to-use, industrial-strength publish-subscribe framework. I wanted to provide not just a publish-subscribe service, but also a general-purpose framework that automates implementing such services and adding the support for any application in just one line of code (if that). The first step in building the framework was to factor the publish-subscribe management interfaces and provide separate contracts for transient and persistent subscriptions and for publishing.[9]
For managing transient subscriptions, I defined the ISubscriptionService
interface shown in Example C-1.
Example C-1. The ISubscriptionService interface manages transient subscribers
[ServiceContract] public interface ISubscriptionService { [OperationContract] void Subscribe(string eventOperation); [OperationContract] void Unsubscribe(string eventOperation); }
Note that ISubscriptionService
does not identify
the callback contract its implementing endpoint expects. Being a general-purpose
interface, it is unaware of particular callback contracts. It is up to the using
application to define those callback contracts. The callback interface is provided in the
using application by deriving from ISubscriptionService
and specifying the desired callback contract:
public interface IMyEvents
{
[OperationContract(IsOneWay = true)]
void OnEvent1( );
[OperationContract(IsOneWay = true)]
void OnEvent2(int number);
[OperationContract(IsOneWay = true)]
void OnEvent3(int number,string text);
}
[ServiceContract(CallbackContract = typeof(IMyEvents)
)]
public interface IMySubscriptionService : ISubscriptionService
{}
Typically, every operation on the callback contract corresponds to a specific event.
The subinterface of ISubscriptionService
(IMySubscriptionService
, in this example) does not need to add
operations. The transient subscription management functionality is provided by ISubscriptionService
. In each call to Subscribe( )
or Unsubscribe( )
, the
subscriber needs to provide the name of the operation (and therefore the event) it wants
to subscribe to or unsubscribe from. If the caller wishes to subscribe to all events, it
can pass an empty or null
string.
My framework offers an implementation for the methods of ISubscriptionService
in the form of the generic abstract class SubscriptionManager<T>
:
public abstract class SubscriptionManager<T> where T : class { public void Subscribe(string eventOperation); public void Unsubscribe(string eventOperation); //More members }
The generic type parameter for SubscriptionManager<T>
is the events contract. Note that SubscriptionManager<T>
does not implement ISubscriptionService
.
The using application needs to expose its own transient subscription service in the
form of an endpoint that supports its specific subinterface of ISubscriptionService
. To do so, the application needs to provide a service
class that derives from SubscriptionManager<T>
,
specify the callback contract as a type parameter, and derive from that specific
subinterface of ISubscriptionService
. For example, to
implement a transient subscription service using the IMyEvents
callback interface:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MySubscriptionService : SubscriptionManager<IMyEvents
>,IMySubscriptionService
{}
MySubscriptionService
doesn't need any code because
IMySubscriptionService
does not add any new
operations, and SubscriptionManager<T>
already
implements the methods of ISubscriptionService
.
Note that just deriving from SubscriptionManager<IMyEvents>
is insufficient because it does not
derive from a contract interface—you must add the derivation from IMySubscriptionService
to support transient subscriptions.
Finally, the using application needs to define an endpoint for IMySubscriptionService
:
<services> <service name = "MySubscriptionService"> <endpoint address = "..." binding = "..." contract = "IMySubscriptionService" /> </service> </services>
Example C-2 shows how SubscriptionManager<T>
manages transient
subscriptions.
Example C-2. The transient subscribers management in SubscriptionManager<T>
public abstract class SubscriptionManager<T> where T : class { static Dictionary<string,List<T>> m_TransientStore; static SubscriptionManager( ) { m_TransientStore = new Dictionary<string,List<T>>( ); string[] methods = GetOperations( ); Action<string> insert = (methodName)=> { m_TransientStore.Add(methodName,new List<T>( )); }; methods.ForEach(insert); } static string[] GetOperations( ) { MethodInfo[] methods = typeof(T).GetMethods(BindingFlags.Public| BindingFlags.FlattenHierarchy| BindingFlags.Instance); List<string> operations = new List<string>(methods.Length); Action<MethodInfo> add = (method)=> { Debug.Assert(!operations.Contains(method.Name)); operations.Add(method.Name); }; methods.ForEach(add); return operations.ToArray( ); } static void AddTransient(T subscriber,string eventOperation) { lock(typeof(SubscriptionManager<T>)) { List<T> list = m_TransientStore[eventOperation]; if(list.Contains(subscriber)) { return; } list.Add(subscriber); } } static void RemoveTransient(T subscriber,string eventOperation) { lock(typeof(SubscriptionManager<T>)) { List<T> list = m_TransientStore[eventOperation]; list.Remove(subscriber); } } public void Subscribe(string eventOperation) { lock(typeof(SubscriptionManager<T>)) { T subscriber = OperationContext.Current.GetCallbackChannel<T>( ); if(String.IsNullOrEmpty(eventOperation) == false) { AddTransient(subscriber,eventOperation); } else { string[] methods = GetOperations( ); Action<string> addTransient = (methodName)=> { AddTransient(subscriber,methodName); }; methods.ForEach(addTransient); } } } public void Unsubscribe(string eventOperation) { lock(typeof(SubscriptionManager<T>)) { T subscriber = OperationContext.Current.GetCallbackChannel<T>( ); if(String.IsNullOrEmpty(eventOperation) == false) { RemoveTransient(subscriber,eventOperation); } else { string[] methods = GetOperations( ); Action<string> removeTransient = (methodName)=> { RemoveTransient(subscriber,methodName); }; methods.ForEach(removeTransient); } } } //More members }
SubscriptionManager<T>
stores the transient
subscribers in a generic static dictionary called m_TransientStore
:
static Dictionary<string,List<T>> m_TransientStore;
Each entry in the dictionary contains the name of the event operation and all its
subscribers in the form of a linked list. The static constructor of SubscriptionManager<T>
uses reflection to get all the
operations of the callback interfaces (the type parameter for SubscriptionManager<T>
) and initializes the dictionary to have all the
operations with empty lists. The Subscribe( )
method
extracts the callback reference from the operation call context. If the caller specifies
an operation name, Subscribe( )
calls the helper method
AddTransient( )
. AddTransient( )
retrieves the list of subscribers for the event from the
store, and if the list does not contain the subscriber, it adds it in.
If the caller specifies an empty string or null
for
the operation name, Subscribe( )
calls AddTransient( )
for each operation in the callback
contract.
Unsubscribe( )
operates in a similar manner. Note
that the caller can subscribe to all events and then unsubscribe from a particular
one.
For managing persistent subscribers, I defined the IPersistentSubscriptionService
interface shown in Example C-3.
Example C-3. The IPersistentSubscriptionService interface manages persistent subscribers
[ServiceContract] public interface IPersistentSubscriptionService { [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] voidSubscribe
(string address,string eventsContract,string eventOperation); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] voidUnsubscribe
(string address,string eventsContract,string eventOperation); //More members }
To add a persistent subscriber, the caller needs to call Subscribe( )
, providing the address of the subscriber, the event's contract
name, and the specific event operation itself. To unsubscribe, the caller calls Unsubscribe( )
with the same information. Note that IPersistentSubscriptionService
does not imply in any way where
the subscribers persist on the service side—that is an implementation detail.
The class SubscriptionManager<T>
, presented
previously, also implements the methods of IPersistentSubscriptionService
:
[BindingRequirement(TransactionFlowEnabled = true)] public abstract class SubscriptionManager<T> where T : class { public void Unsubscribe(string address,string eventsContract, string eventOperation); public void Subscribe(string address,string eventsContract, string eventOperation); //More members }
SubscriptionManager<T>
stores the persistent
subscribers in SQL Server. It is configured to use the Client/Service transaction mode
(presented in Chapter 7), and it enforces that mode using my BindingRequirement
attribute.
The generic type parameter for SubscriptionManager<T>
is the events contract. Note that SubscriptionManager<T>
does not derive from IPersistentSubscriptionService
. The using application needs to
expose its own persistent subscription service, but there is no need to derive a new
contract from IPersistentSubscriptionService
because no
callback references are required. The application simply derives from SubscriptionManager<T>
, specifying the events contract
as a type parameter and adding a derivation from IPersistentSubscriptionService
. For example:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class MySubscriptionService : SubscriptionManager<IMyEvents>,
IPersistentSubscriptionService
{}
There's no need for any code in MySubscriptionService
because SubscriptionManager<T>
already implements the methods of IPersistentSubscriptionService
.
Note that just deriving from SubscriptionManager<IMyEvents>
is insufficient, because SubscriptionManager<IMyEvents>
does not derive from a
contract interface—you must add the derivation from IPersistentSubscriptionService
to support persistent subscriptions.
Finally, the application needs to define an endpoint for IPersistentSubscriptionService
:
<services> <service name = "MySubscriptionService"> <endpoint address = "..." binding = "..." contract = "IPersistentSubscriptionService" /> </service> </services>
The implementation of the methods of IPersistentSubscriptionService
by SubscriptionManager<T>
is shown in Example C-4. Example C-4 is very similar to Example C-2, except the subscribers are stored
in SQL Server, not in memory in a dictionary.
Example C-4. Persistent subscriber management in SubscriptionManager<T>
public abstract class SubscriptionManager<T> where T : class { static void AddPersistent(string address,string eventsContract, string eventOperation) { //Store the subscription in SQL Server } static void RemovePersistent(string address,string eventsContract, string eventOperation) { //Remove the subscription from SQL Server } [OperationBehavior(TransactionScopeRequired = true)] public void Subscribe(string address,string eventsContract, string eventOperation) { if(String.IsNullOrEmpty(eventOperation) == false) { AddPersistent(address,eventsContract,eventOperation); } else { string[] methods = GetOperations( ); Action<string> addPersistent = (methodName)=> { AddPersistent(address,eventsContract,methodName); }; methods.ForEach(addPersistent); } } [OperationBehavior(TransactionScopeRequired = true)] public void Unsubscribe(string address,string eventsContract, string eventOperation) { if(String.IsNullOrEmpty(eventOperation) == false) { RemovePersistent(address,eventsContract,eventOperation); } else { string[] methods = GetOperations( ); Action<string> removePersistent = (methodName)=> { RemovePersistent(address,eventsContract,methodName); }; methods.ForEach(removePersistent); } } //More members }
If the application wants to support both transient and persistent subscribers for the
same events contract, simply derive the subscription service class from both the
specialized subinterface of ISubscriptionService
and
IPersistentSubscriptionService
:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class MySubscriptionService : SubscriptionManager<IMyEvents>,
IMySubscriptionService,IPersistentSubscriptionService
{}
and expose the two matching endpoints:
<services> <service name = "MySubscriptionService"> <endpoint address = "..." binding = "..." contract ="IMySubscriptionService"
/> <endpoint address = "..." binding = "..." contract ="IPersistentSubscriptionService"
/> </service> </services>
The parts of the publish-subscribe framework shown so far have dealt only with the aspects of subscription management. The framework also enables easy implementation of the publishing service. The publishing service should support the same events contract as the subscribers, and it should be the only point of contact known to the publishers in the application. Because the publishing service exposes the events contract in an endpoint, you need to mark the events contract as a service contract, even if you only use it for duplex callbacks with transient subscribers:
[ServiceContract]
public interface IMyEvents
{
[OperationContract(IsOneWay = true)]
void OnEvent1( );
[OperationContract(IsOneWay = true)]
void OnEvent2(int number);
[OperationContract(IsOneWay = true)]
void OnEvent3(int number,string text);
}
The publish-subscribe framework contains the helper class PublishService<T>
, defined as:
public abstract class PublishService<T> where T : class { protected static void FireEvent(params object[] args); }
PublishService<T>
requires as a type
parameter the type of the events contract. To provide your own publishing service, derive
from PublishService<T>
and use the FireEvent( )
method to deliver the event to all subscribers,
be they transient or persistent, as shown in Example C-5.
Example C-5. Implementing an event-publishing service
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MyPublishService : PublishService<IMyEvents>,IMyEvents { public void OnEvent1( ) { FireEvent( ); } public void OnEvent2(int number) { FireEvent(number); } public void OnEvent3(int number,string text) { FireEvent(number,text); } }
Note that you can use FireEvent( )
to fire any type
of event, regardless of the parameters, because of the use of the params object
array.
Finally, the application needs to expose an endpoint for the publishing service with the events contract:
<services> <service name = "MyPublishService"> <endpoint address = "..." binding = "..." contract = "IMyEvents" /> </service> </services>
Example C-6 shows the implementation of
PublishService<T>
.
Example C-6. Implementing PublishService<T>
public abstract class PublishService<T> where T : class { protected static void FireEvent(params object[] args) { string action = OperationContext.Current.IncomingMessageHeaders.Action; string[] slashes = action.Split('/'), string methodName = slashes[slashes.Length-1]; FireEvent(methodName,args); } static void FireEvent(string methodName,params object[] args) { PublishPersistent(methodName,args); PublishTransient(methodName,args); } static void PublishPersistent(string methodName,params object[] args) { T[] subscribers = SubscriptionManager<T>.GetPersistentList(methodName); Publish(subscribers,true
,methodName,args); } static void PublishTransient(string methodName,params object[] args) { T[] subscribers = SubscriptionManager<T>.GetTransientList(methodName); Publish(subscribers,false
,methodName,args); } static void Publish(T[] subscribers,bool closeSubscribers,string methodName, params object[] args) { WaitCallback fire = (subscriber)=> { Invoke(subscriber as T,methodName,args); if(closeSubscribers) { using(subscriber as IDisposable) {} } }; Action<T> queueUp = (subscriber)=> { ThreadPool.QueueUserWorkItem(fire,subscriber); }; subscribers.ForEach(queueUp); } static void Invoke(T subscriber,string methodName,object[] args) { Debug.Assert(subscriber != null); Type type = typeof(T); MethodInfo methodInfo = type.GetMethod(methodName); try { methodInfo.Invoke(subscriber,args); } catch {} } }
To simplify firing the event, the FireEvent( )
method accepts the parameters to pass to the subscribers—yet its caller does not provide
it with the name of the operation to invoke on the subscribers. Instead, FireEvent( )
extracts the method name from the incoming
message headers. It then uses an overloaded FireEvent(
)
that accepts the method name. That method in turn uses the helper method
PublishPersistent( )
to publish to all persistent
subscribers, and the PublishTransient( )
helper method
to publish to all transient subscribers. The publishing methods operate in an almost
identical way: they access SubscriptionManager<T>
to retrieve their respective subscribers list, and then use the Publish( )
method to fire the event. The subscribers are returned in the form
of an array of proxies to the subscribers, which is passed to the Publish( )
method.
Publish( )
could have simply invoked the
subscribers at this point. However, I wanted to support concurrent publishing of events,
so that if any subscriber is undisciplined and takes a long time to process the event,
this will not preclude the other subscribers from receiving the event in a timely manner.
Having the event operations marked as one-way is no guarantee of asynchronous invocation,
and besides, I wanted to support concurrent publishing even when the event operation is
not marked as a one-way operation. Publish( )
defines
two anonymous methods. The first calls the Invoke( )
helper method, which fires the event to the individual subscriber provided and then closes
the proxy if so specified. Because Invoke( )
was never
compiled against the specific subscriber type, it uses reflection and late binding for the
invocation. Invoke( )
also suppresses any exceptions
raised by the invocation, because these are of no interest to the publishing party. The
second anonymous method queues up the first anonymous method to be executed by a thread
from the thread pool. Finally, Publish( )
invokes the
second anonymous method on every subscriber in the provided array.
Note how uniformly PublishService<T>
treats
the subscribers—it almost does not matter if they are transient or persistent. The only
difference is that after publishing to a persistent subscriber, you need to close the
proxy. This uniformity is achieved by the helper methods GetTransientList( )
and GetPersistentList(
)
of SubscriptionManager<T>
. Of these
two, GetTransientList( )
is the simpler one:
public abstract class SubscriptionManager<T> where T : class { internal static T[] GetTransientList(string eventOperation) { lock(typeof(SubscriptionManager<T>)) { if(m_TransientStore.ContainsKey(eventOperation)) { List<T> list = m_TransientStore[eventOperation]; return list.ToArray( ); } return new T[]{}; } } //More members }
GetTransientList( )
looks up in the transient store
all the subscribers to the specified operation and returns them as an array. GetPersistentList( )
faces a bigger challenge: there is no
ready-made list of proxies to persistent subscribers; the only thing known about them is
their addresses. GetPersistentList( )
therefore needs
to instantiate the persistent subscribers' proxies, as shown in Example C-7.
Example C-7. Creating the persistent subscribers proxy list
public abstract class SubscriptionManager<T> where T : class { internal static T[] GetPersistentList(string eventOperation) { string[] addresses = GetSubscribersToContractEventOperation( typeof(T).ToString( ),eventOperation); List<T> subscribers = new List<T>(addresses.Length); foreach(string address in addresses) { Binding binding = GetBindingFromAddress(address); T proxy = ChannelFactory<T>.CreateChannel(binding, new EndpointAddress(address)); subscribers.Add(proxy); } return subscribers.ToArray( ); } static string[] GetSubscribersToContractEventOperation(string eventsContract, string eventOperation) { //Query SQL Server for the subscribers to the event } static Binding GetBindingFromAddress(string address) { if(address.StartsWith("http:") || address.StartsWith("https:")) { WSHttpBinding binding = new WSHttpBinding( ); binding.ReliableSession.Enabled = true; binding.TransactionFlow = true; return binding; } if(address.StartsWith("net.tcp:")) { NetTcpBinding binding = new NetTcpBinding( ); binding.ReliableSession.Enabled = true; binding.TransactionFlow = true; return binding; } /* Similar code for the IPC and MSMQ bindings */ Debug.Assert(false,"Unsupported binding specified"); return null; } //More members }
To create a proxy for each subscriber, GetPersistentList(
)
needs the subscriber's address, binding, and contract. The contract is, of
course, the type parameter for SubscriptionManager<T>
. To obtain the addresses GetPersistentList( )
calls GetSubscribersToContractEventOperation( )
, which queries the subscribers
store and returns as an array the addresses of all of the persistent subscribers who have
subscribed to the specified event. All GetPersistentList(
)
needs now is the binding used by each subscriber. For that GetPersistentList( )
calls the helper method GetBindingFromAddress( )
, which infers the binding to use from
the address schema. GetBindingFromAddress( )
treats all
HTTP addresses as indicating use of the WSHttpBinding
.
In addition, GetBindingFromAddress( )
turns on
reliability and transaction propagation for each binding, to enable including the event in
the publisher's transaction when one-way operations are not used, such as with this events
contract:
[ServiceContract] interface IMyEvents { [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void OnEvent1( ); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void OnEvent2(int number); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void OnEvent3(int number,string text); }
While you can add and remove persistent subscriptions at runtime by using the methods
of the IPersistentSubscriptionService
interface shown
in Example C-3, because of their persistent
nature, managing the subscriptions is best done via some kind of administration tool. To
that end, IPersistentSubscriptionService
defines
additional operations that answer various queries against the subscribers store, as shown
in Example C-8.
Example C-8. The IPersistentSubscriptionService interface
[DataContract] public struct PersistentSubscription { [DataMember] public string Address {get;set;} [DataMember] public string EventsContract {get;set;} [DataMember] public string EventOperation {get;set;} } [ServiceContract] public interface IPersistentSubscriptionService { //Administration operations [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetAllSubscribers( ); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetSubscribersToContract(string eventsContract); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] string[] GetSubscribersToContractEventType(string eventsContract, string eventOperation); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetAllSubscribersFromAddress(string address); //More members }
All of these administration operations utilize a simple data contract called PersistentSubscription
, which contains the address of the
subscriber, the subscribed contract, and the event. GetAllSubscribers( )
simply returns the list of all subscribers. GetSubscribersToContract( )
returns all subscribers to a
specific contract, and GetSubscribersToContractEventType(
)
returns all subscribers to a particular event operation on a specified
contract. Finally, for the sake of completeness, GetAllSubscribersFromAddress( )
returns all subscribers that provided a
specified address.
My publish-subscribe framework includes a sample persistent subscription administration tool called Persistent Subscription Manager, shown in Figure C-2.
The administration tool uses IPersistentSubscriptionService
to add and remove subscriptions. To add a new
subscription, you need to provide it with the metadata exchange address of the events
contract definition. You can use the metadata exchange address of the persistent
subscriber itself or the metadata exchange address of the publishing service (such as the
one shown in Example C-5), because they are
polymorphic. Enter the metadata exchange base address in the MEX Address text box and
click the Lookup button. The tool will programmatically retrieve the metadata of the event
service and populate the Contract and Event combo boxes. Retrieving the metadata and
parsing its content is done using my MetadataHelper
class, presented in Chapter 2.
Once you've provided the address of the persistent subscriber, click the Subscribe
button. Persistent Subscription Manager then adds the subscription by calling to the
subscription service (MySubscriptionService
in the
examples so far). The address for the subscription service is maintained in the Persistent
Subscription Manager config file.
While duplex operations are, in general, the only way to subscribe a live object,
there is one exception to that rule: a singleton subscriber. You can treat a singleton
service as just another persistent subscriber and add its address to the subscription
store. This technique is particularly useful for user-interface applications that need
to monitor some events. You can use my FormHost<F>
(presented in Chapter 8)
to expose the form as a singleton, and then add the form as a persistent subscriber. You
can add the form using the Persistent Subscription Manager tool, or the form can
subscribe itself upon startup.
The publish-subscribe pattern also decouples the system security-wise. Publishers only need to authenticate themselves against a single publishing service, as opposed to multiple subscribers and potentially multiple security mechanisms. The subscribers in turn only need to allow the publishing service, rather than all publishers in the system, to deliver events; they trust the publishing service to properly authenticate and authorize the publishers. Applying role-based security on the publishing service enables you to easily enforce in one place various rules regarding who is allowed to publish an event across the system.
Instead of using the synchronous bindings to either publish or subscribe to the
events, you can use the NetMsmqBinding
. A queued
publish-subscribe service combines the benefits of a loosely coupled system and the
flexibility of disconnected execution. When using queued events, all events on the
contract must, of course, be marked as one-way operations. As shown in Figure C-3, you can use queuing at either end
independently.
You can have a queued publisher and connected synchronous subscribers, you can have a connected publisher publishing to queued subscribers, or you can have both queued publishers and queued subscribers. Note, however, that you cannot have queued transient subscriptions—there is no support within the MSMQ binding for duplex callbacks, since that would render the disconnected aspect of the communication useless. As before, you can use the administration tool to manage the subscribers, and the administration operations are still connected and synchronous.
To utilize a queued publisher, the publishing service needs to expose a queued endpoint using the MSMQ binding. When firing events at a queued publisher, the publishing service can be offline, or the publishing client itself can be disconnected. Note that when publishing two events to a queued publishing service, there are no guarantees as to the order in which these events will be delivered to and processed by the end subscribers. Due to the asynchronous concurrent publishing, there is no order even when the events contract is configured for a session.
To deploy a queued subscriber, the persistent subscribing service needs to expose a queued endpoint. This will enable the subscriber to be offline even when the publisher is online. When the subscriber connects again, it will receive all of its queued-up events. In addition, queued subscribers can handle the case when the publishing service itself is disconnected, because no events are lost. Of course, having both a queued publisher and subscriber allows both to work offline at the same time.
When multiple events are fired at a single queued subscriber, there are no guarantees as to the order of delivery of the events, even when the events contract is configured for a session.
[9] I first published my publish-subscribe framework in my article "WCF Essentials: What You Need to Know About One-Way Calls, Callbacks, and Events" (MSDN Magazine, October 2006).
3.133.158.36