ServiceModelEx contains a simple-to-use, industrial-strength publish-subscribe framework. I wanted to provide not just a publish-subscribe service, but also a general-purpose framework that automates implementing such services and adding the support for any application in just one line of code (if that). The first step in building the framework was to factor the publish-subscribe management interfaces and provide separate contracts for transient and persistent subscriptions and for publishing.[9]
For managing transient subscriptions, I defined the ISubscriptionService
interface shown in
Example D-1.
Example D-1. The ISubscriptionService interface manages transient subscribers
[ServiceContract] public interface ISubscriptionService { [OperationContract] void Subscribe(string eventOperation); [OperationContract] void Unsubscribe(string eventOperation); }
Note that ISubscriptionService
does not identify the
callback contract its implementing endpoint expects. Being a
general-purpose interface, it is unaware of particular callback
contracts. It is up to the using application to define those callback
contracts. The callback interface is provided in the using application
by deriving from ISubscription
Service
and specifying the desired
callback contract:
public interface IMyEvents
{
[OperationContract(IsOneWay = true)]
void OnEvent1();
[OperationContract(IsOneWay = true)]
void OnEvent2(int number);
[OperationContract(IsOneWay = true)]
void OnEvent3(int number,string text);
}
[ServiceContract(CallbackContract = typeof(IMyEvents)
)]
public interface IMySubscriptionService : ISubscriptionService
{}
Typically, every operation on the callback contract corresponds
to a specific event. The subinterface of ISubscriptionService
(IMySubscriptionService
, in this example)
does not need to add operations. ISubscriptionService
provides the transient
subscription management functionality. In each call to Subscribe()
or Unsubscribe()
, the subscriber needs to
provide the name of the operation (and therefore the event) it wants
to subscribe to or unsubscribe from. If the caller wants to subscribe
to all events, it can pass an empty or null
string.
My framework offers an implementation for the methods of
ISubscriptionService
in the form of
the generic abstract class SubscriptionManager<T>
:
public abstract class SubscriptionManager<T> where T : class { public void Subscribe(string eventOperation); public void Unsubscribe(string eventOperation); //More members }
The generic type parameter for SubscriptionManager<T>
is the events
contract. Note that SubscriptionManager<T>
does not
implement ISubscriptionService
.
The using application needs to expose its own transient
subscription service in the form of an endpoint that supports its
specific subinterface of ISubscriptionService
. To do so, the application needs to provide a service
class that derives from Subscription
Manager<T>
, specify the callback
contract as a type parameter, and derive from that specific
subinterface of ISubscriptionService
. For example, to
implement a transient subscription service using the IMyEvents
callback interface:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MySubscriptionService : SubscriptionManager<IMyEvents
>,IMySubscriptionService
{}
MySubscriptionService
doesn’t
need any code because IMySubscriptionService
does not add any new
operations and SubscriptionManager<T>
already
implements the methods of ISubscriptionService
.
Note that just deriving from SubscriptionManager<IMyEvents>
is
insufficient because it does not derive from a contract interface—you
must add the derivation from IMy
Subscription
Service
to support transient
subscriptions.
Finally, the using application needs to define an endpoint for
IMySubscriptionService
:
<services> <service name = "MySubscriptionService"> <endpoint address = "..." binding = "..." contract = "IMySubscriptionService" /> </service> </services>
Example D-2 shows
how SubscriptionManager<T>
manages transient subscriptions.
Example D-2. The transient subscribers management in SubscriptionManager<T>
public abstract class SubscriptionManager<T> where T : class { static Dictionary<string,List<T>> m_TransientStore; static SubscriptionManager() { m_TransientStore = new Dictionary<string,List<T>>(); string[] methods = GetOperations(); Action<string> insert = (methodName)=> { m_TransientStore.Add(methodName,new List<T>()); }; methods.ForEach(insert); } static string[] GetOperations() { MethodInfo[] methods = typeof(T).GetMethods(BindingFlags.Public| BindingFlags.FlattenHierarchy| BindingFlags.Instance); List<string> operations = new List<string>(methods.Length); Action<MethodInfo> add = (method)=> { Debug.Assert(!operations.Contains(method.Name)); operations.Add(method.Name); }; methods.ForEach(add); return operations.ToArray(); } static void AddTransient(T subscriber,string eventOperation) { lock(typeof(SubscriptionManager<T>)) { List<T> list = m_TransientStore[eventOperation]; if(list.Contains(subscriber)) { return; } list.Add(subscriber); } } static void RemoveTransient(T subscriber,string eventOperation) { lock(typeof(SubscriptionManager<T>)) { List<T> list = m_TransientStore[eventOperation]; list.Remove(subscriber); } } public void Subscribe(string eventOperation) { lock(typeof(SubscriptionManager<T>)) { T subscriber = OperationContext.Current.GetCallbackChannel<T>(); if(String.IsNullOrEmpty(eventOperation) == false) { AddTransient(subscriber,eventOperation); } else { string[] methods = GetOperations(); Action<string> addTransient = (methodName)=> { AddTransient(subscriber,methodName); }; methods.ForEach(addTransient); } } } public void Unsubscribe(string eventOperation) { lock(typeof(SubscriptionManager<T>)) { T subscriber = OperationContext.Current.GetCallbackChannel<T>(); if(String.IsNullOrEmpty(eventOperation) == false) { RemoveTransient(subscriber,eventOperation); } else { string[] methods = GetOperations(); Action<string> removeTransient = (methodName)=> { RemoveTransient(subscriber,methodName); }; methods.ForEach(removeTransient); } } } //More members }
SubscriptionManager<T>
stores the transient subscribers in a generic static dictionary called
m_TransientStore
:
static Dictionary<string,List<T>> m_TransientStore;
Each entry in the dictionary contains the name of the event
operation and all its subscribers in the form of a linked list. The
static constructor of SubscriptionManager<T>
uses reflection
to get all the operations of the callback interfaces (the type
parameter for SubscriptionManager<T>
) and
initializes the dictionary to have all the operations with empty
lists. The Subscribe()
method
extracts the callback reference from the operation call context. If
the caller specifies an operation name, Subscribe()
calls the helper method AddTransient()
. AddTransient()
retrieves the list of
subscribers for the event from the store, and if the list does not
contain the subscriber, it adds it in.
If the caller specifies an empty string or null
for the operation name, Subscribe()
calls AddTransient()
for each operation in the
callback contract.
Unsubscribe()
operates in a
similar manner. Note that the caller can subscribe to all events and
then unsubscribe from a particular one.
For managing persistent subscribers, I defined the
IPersistentSubscriptionService
interface shown in Example D-3.
Example D-3. The IPersistentSubscriptionService interface manages persistent subscribers
[ServiceContract] public interface IPersistentSubscriptionService { [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] voidSubscribe
(string address,string eventsContract,string eventOperation); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] voidUnsubscribe
(string address,string eventsContract,string eventOperation); //More members }
To add a persistent subscriber, the caller needs to call
Subscribe()
, providing the address
of the subscriber, the event’s contract name, and the specific event
operation itself. To unsubscribe, the caller calls Unsubscribe()
with the same information.
Note that IPersistentSubscriptionService
does not
imply where the subscribers persist on the service side—that is an
implementation detail.
The class SubscriptionManager<T>
, presented
previously, also implements the methods of IPersistentSubscriptionService
:
[BindingRequirement(TransactionFlowEnabled = true)] public abstract class SubscriptionManager<T> where T : class { public void Unsubscribe(string address,string eventsContract, string eventOperation); public void Subscribe(string address,string eventsContract, string eventOperation); //More members }
SubscriptionManager<T>
stores the persistent subscribers in SQL Server. It is configured to
use the Client/Service transaction mode (presented in Chapter 7), and it enforces that mode using my
BindingRequirement
attribute.
The generic type parameter for SubscriptionManager<T>
is the events
contract. Note that SubscriptionManager<T>
does not derive
from IPersistentSubscriptionService
. The using
application needs to expose its own persistent subscription service,
but there is no need to derive a new contract from IPersistentSubscriptionService
, because
no callback references are required. The
application simply derives from Subscription
Manager<T>
, specifying the events
contract as a type parameter and adding a derivation from IPersistentSubscriptionService
, for
example:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class MySubscriptionService : SubscriptionManager<IMyEvents>,
IPersistentSubscriptionService
{}
There’s no need for any code in
MySubscriptionService
, because
Sub
scription
Manager<T>
already implements the
methods of IPersistentSubscriptionService
.
Note that just deriving from SubscriptionManager<IMyEvents>
is
insufficient, because SubscriptionManager<IMyEvents>
does
not derive from a contract interface—you must add the derivation from
IPersistentSubscriptionService
to
support persistent subscriptions.
Finally, the application needs to
define an endpoint for IPersistent
Subscription
Service
:
<services> <service name = "MySubscriptionService"> <endpoint address = "..." binding = "..." contract = "ServiceModelEx.IPersistentSubscriptionService" /> </service> </services>
The implementation of the methods of IPersistentSubscriptionService
by Subscription
Manager<T>
is
shown in Example D-4.
Example D-4 is very
similar to Example D-2,
except the subscribers are stored in SQL Server, not in memory in a
dictionary.
Example D-4. Persistent subscriber management in SubscriptionManager<T>
public abstract class SubscriptionManager<T> where T : class { static void AddPersistent(string address,string eventsContract, string eventOperation) { //Store the subscription in SQL Server } static void RemovePersistent(string address,string eventsContract, string eventOperation) { //Remove the subscription from SQL Server } [OperationBehavior(TransactionScopeRequired = true)] public void Subscribe(string address,string eventsContract, string eventOperation) { if(String.IsNullOrEmpty(eventOperation) == false) { AddPersistent(address,eventsContract,eventOperation); } else { string[] methods = GetOperations(); Action<string> addPersistent = (methodName)=> { AddPersistent(address,eventsContract,methodName); }; methods.ForEach(addPersistent); } } [OperationBehavior(TransactionScopeRequired = true)] public void Unsubscribe(string address,string eventsContract, string eventOperation) { if(String.IsNullOrEmpty(eventOperation) == false) { RemovePersistent(address,eventsContract,eventOperation); } else { string[] methods = GetOperations(); Action<string> removePersistent = (methodName)=> { RemovePersistent(address,eventsContract,methodName); }; methods.ForEach(removePersistent); } } //More members }
If you want the application to support both transient and
persistent subscribers for the same events contract, simply derive the
subscription service class from both the specialized subinterface of
ISubscriptionService
and IPersistentSubscriptionService
:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MySubscriptionService : SubscriptionManager<IMyEvents>, IMySubscriptionService, IPersistentSubscriptionService {}
Next, expose the two matching endpoints:
<services> <service name = "MySubscriptionService"> <endpoint address = "..." binding = "..." contract ="IMySubscriptionService"
/> <endpoint address = "..." binding = "..." contract ="ServiceModelEx.IPersistentSubscriptionService"
/> </service> </services>
The parts of the publish-subscribe framework shown so far deal only with the aspects of subscription management. The framework also enables easy implementation of the publishing service. The publishing service should support the same events contract as the subscribers, and it should be the only point of contact known to the publishers in the application. Because the publishing service exposes the events contract in an endpoint, you need to mark the events contract as a service contract, even if you only use it for duplex callbacks with transient subscribers:
[ServiceContract]
public interface IMyEvents
{
[OperationContract(IsOneWay = true)]
void OnEvent1();
[OperationContract(IsOneWay = true)]
void OnEvent2(int number);
[OperationContract(IsOneWay = true)]
void OnEvent3(int number,string text);
}
The publish-subscribe framework contains the helper class
PublishService<T>
, defined
as:
public abstract class PublishService<T> where T : class { protected static void FireEvent(params object[] args); }
PublishService<T>
requires as a type parameter the type of the events contract. To
provide your own publishing service, derive from PublishService<T>
and use the FireEvent()
method to deliver the event to
all subscribers, be they transient or persistent, as shown in Example D-5.
Example D-5. Implementing an event-publishing service
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MyPublishService : PublishService<IMyEvents>,IMyEvents { public void OnEvent1() { FireEvent(); } public void OnEvent2(int number) { FireEvent(number); } public void OnEvent3(int number,string text) { FireEvent(number,text); } }
Note that you can use FireEvent()
to fire any type of event,
regardless of the parameters, because of the use of the params object
array.
Finally, the application needs to expose an endpoint for the publishing service with the events contract:
<services> <service name = "MyPublishService"> <endpoint address = "..." binding = "..." contract = "IMyEvents" /> </service> </services>
Example D-6 shows the
implementation of PublishService<T>
.
Example D-6. Implementing PublishService<T>
public abstract class PublishService<T> where T : class { protected static void FireEvent(params object[] args) { string action = OperationContext.Current.IncomingMessageHeaders.Action; string[] slashes = action.Split('/'), string methodName = slashes[slashes.Length-1]; FireEvent(methodName,args); } static void FireEvent(string methodName,object[] args) { PublishPersistent(methodName,args); PublishTransient(methodName,args); } static void PublishPersistent(string methodName,object[] args) { T[] subscribers = SubscriptionManager<T>.GetPersistentList(methodName); Publish(subscribers,true
,methodName,args); } static void PublishTransient(string methodName,object[] args) { T[] subscribers = SubscriptionManager<T>.GetTransientList(methodName); Publish(subscribers,false
,methodName,args); } static void Publish(T[] subscribers,bool closeSubscribers,string methodName, object[] args) { WaitCallback fire = (subscriber)=> { Invoke(subscriber as T,methodName,args); if(closeSubscribers) { using(subscriber as IDisposable) {} } }; Action<T> queueUp = (subscriber)=> { ThreadPool.QueueUserWorkItem(fire,subscriber); }; subscribers.ForEach(queueUp); } static void Invoke(T subscriber,string methodName,object[] args) { Debug.Assert(subscriber != null); Type type = typeof(T); MethodInfo methodInfo = type.GetMethod(methodName); try { methodInfo.Invoke(subscriber,args); } catch {} } }
To simplify firing the event, the FireEvent()
method accepts the parameters to
pass to the subscribers, yet its caller does not provide it with the
name of the operation to invoke on the subscribers. Instead, FireEvent()
extracts the method name from
the incoming message headers. It then uses an overloaded FireEvent()
that accepts the method name.
That method, in turn, uses the helper method PublishPersistent()
to publish to all
persistent subscribers and uses the PublishTransient()
helper method to publish to all transient subscribers.
The publishing methods operate in an almost identical way: they access
SubscriptionManager<T>
to
retrieve their respective subscribers list, then use the Publish()
method to fire the event. The
subscribers are returned in the form of an array of proxies to the
subscribers, which is passed to the Publish()
method.
Publish()
could have simply
invoked the subscribers at this point. However, I wanted to support
concurrent publishing of events so that if any subscriber is
undisciplined and takes a long time to process the event, this will
not preclude the other subscribers from receiving the event in a
timely manner. Having the event operations marked as one way is no
guarantee of asynchronous invocation, and besides, I wanted to support
concurrent publishing even when the event operation is not marked as a
one-way operation. Publish()
defines two anonymous methods. The first calls the Invoke()
helper method, which fires the
event to the individual subscriber provided and then closes the proxy
if so specified. Because Invoke()
was never compiled against the specific subscriber type, it uses
reflection and late binding for the invocation. Invoke()
also suppresses any exceptions
raised by the invocation, because these are of no interest to the
publishing party. The second anonymous method queues up the first
anonymous method to be executed by a thread from the thread pool.
Finally, Publish()
invokes the
second anonymous method on every subscriber in the provided
array.
Notice how uniformly PublishService<T>
treats the
subscribers—it almost does not matter if they are transient or
persistent. The only difference is that after publishing to a
persistent subscriber, you need to close the proxy. You can achieve
this uniformity using the helper methods GetTransientList()
and GetPersistentList()
of Subscription
Manager<T>
. Of
these two, GetTransientList()
is
the simpler one:
public abstract class SubscriptionManager<T> where T : class { internal static T[] GetTransientList(string eventOperation) { lock(typeof(SubscriptionManager<T>)) { if(m_TransientStore.ContainsKey(eventOperation)) { List<T> list = m_TransientStore[eventOperation]; return list.ToArray(); } return new T[]{}; } } //More members }
GetTransientList()
looks up
all the subscribers to the specified operation in the transient store
and returns them as an array. GetPersistentList()
faces a bigger
challenge: there is no ready-made list of proxies to persistent
subscribers. The only thing known about them is their addresses.
GetPersistentList()
therefore needs
to instantiate the persistent subscribers’ proxies, as shown in Example D-7.
Example D-7. Creating the persistent subscribers proxy list
public abstract class SubscriptionManager<T> where T : class { internal static T[] GetPersistentList(string eventOperation) { string[] addresses = GetSubscribersToContractEventOperation( typeof(T).ToString(),eventOperation); List<T> subscribers = new List<T>(addresses.Length); foreach(string address in addresses) { Binding binding = GetBindingFromAddress(address); T proxy = ChannelFactory<T>.CreateChannel(binding, new EndpointAddress(address)); subscribers.Add(proxy); } return subscribers.ToArray(); } static string[] GetSubscribersToContractEventOperation(string eventsContract, string eventOperation) { //Query SQL Server for the subscribers to the event } static Binding GetBindingFromAddress(string address) { if(address.StartsWith("http:") || address.StartsWith("https:")) { WSHttpBinding binding = new WSHttpBinding(); binding.ReliableSession.Enabled = true; binding.TransactionFlow = true; return binding; } if(address.StartsWith("net.tcp:")) { NetTcpBinding binding = new NetTcpBinding(); binding.ReliableSession.Enabled = true; binding.TransactionFlow = true; return binding; } /* Similar code for the one-way relay, IPC and MSMQ bindings */ Debug.Assert(false,"Unsupported binding specified"); return null; } //More members }
To create a proxy for each subscriber, GetPersistentList()
needs the subscriber’s
address, binding, and contract.
The contract is, of course, the type parameter for Subscription
Manager<T>
. To obtain the
addresses, GetPersistentList()
calls GetSubscribers
To
Contract
Event
Operation()
, which queries the
subscribers store and returns as an array the addresses of all of the
persistent subscribers who have subscribed to the specified event. All
GetPersistentList()
needs now is
the binding used by each subscriber. For
that, GetPersistentList()
calls the
helper method Get
Binding
From
Address()
, which infers the binding to
use from the address schema. Get
Binding
From
Address()
assumes all HTTP or HTTPS
addresses indicate the use the WSHttpBinding
. For service bus addresses
with the sb
scheme,
GetBinding
From
Address()
uses an instance of the
NetOnewayRelayBinding
.
In addition, when applicable, GetBindingFromAddress()
turns on reliability
and transaction propagation for each binding to enable inclusion of
the event in the publisher’s transaction when one-way operations are
not used, such as with this events contract:
[ServiceContract] interface IMyEvents { [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void OnEvent1(); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void OnEvent2(int number); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void OnEvent3(int number,string text); }
While you can add and remove persistent subscriptions at
runtime by using the methods of the IPersistentSubscriptionService
interface
shown in Example D-3,
because of their persistent nature, you can best manage the
subscriptions with some kind of administration tool. To that end,
IPersistentSubscriptionService
defines additional operations that answer various queries against the
subscribers store, as shown in Example D-8.
Example D-8. The IPersistentSubscriptionService interface
[DataContract] public struct PersistentSubscription { [DataMember] public string Address {get;set;} [DataMember] public string EventsContract {get;set;} [DataMember] public string EventOperation {get;set;} } [ServiceContract] public interface IPersistentSubscriptionService { //Administration operations [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetAllSubscribers(); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetSubscribersToContract(string eventsContract); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] string[] GetSubscribersToContractEventType(string eventsContract, string eventOperation); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetAllSubscribersFromAddress(string address); //More members }
All of these administration operations utilize a simple data
contract called PersistentSubscription
, which contains the
address of the subscriber, the subscribed contract, and the event.
GetAllSubscribers()
simply returns
the list of all subscribers. Get
Subscribers
To
Contract()
returns all subscribers to a
specific contract, and Get
Subscribers
To
Contract
Event
Type()
returns all subscribers to a
particular event operation on a
specified contract. Finally, for the sake of completeness, GetAll
Subscribers
From
Address()
returns all
subscribers that provided a specified address.
My publish-subscribe framework includes a sample persistent subscription administration tool called Persistent Subscription Manager, shown in Figure D-2.
The administration tool uses IPersistentSubscriptionService
to add and
remove subscriptions. To add a new subscription, you need to provide
it with the metadata exchange
address of the events contract definition. You can use the metadata
exchange address of the persistent subscriber itself or the metadata
exchange address of the publishing service (such as the one shown in
Example D-5), because
they are polymorphic. Enter the metadata exchange base address in the
MEX Address text box and click the Lookup button. The tool will
programmatically retrieve the metadata of the event service and
populate the Contract and Event combo boxes. My MetadataHelper
class, presented in Chapter 2, retrieves the metadata and parses its
content.
Once you’ve provided the address of the persistent subscriber,
click the Subscribe button. Persistent Subscription Manager then adds
the subscription by calling to the subscription service (MySubscriptionService
in the examples so
far). The Persistent Subscription Manager config file
maintains the address for the subscription service.
While duplex operations are, in general, the only way
to subscribe a live object, there is one exception to that rule: a
singleton subscriber. You can treat a singleton service as just
another persistent subscriber and add its address to the
subscription store. This technique is particularly useful for user
interface applications that need to monitor some events. You can use
my FormHost<F>
(presented
in Chapter 8) to expose the form as
a singleton, and then add the form as a persistent subscriber. Add
the form using the Persistent Subscription Manager tool, or the form
can subscribe itself upon startup.
The publish-subscribe pattern also decouples the system security-wise. Publishers only need to authenticate themselves against a single publishing service, as opposed to multiple subscribers and potentially multiple security mechanisms. The subscribers, in turn, only need to allow the publishing service, rather than all publishers in the system, to deliver events; they trust the publishing service to properly authenticate and authorize the publishers. Applying role-based security on the publishing service allows you to easily enforce in one place various rules regarding who is allowed to publish an event across the system.
Instead of using the synchronous bindings to either
publish or subscribe to the events, you can use the NetMsmqBinding
. A queued publish-subscribe
service combines the benefits of a loosely coupled system and the
flexibility of disconnected execution. When using queued events, all
events on the contract must, of course, be marked as one-way
operations. As shown in Figure D-3, you can use queuing
at either end independently.
You can have a queued publisher and connected synchronous subscribers, you can have a connected publisher publishing to queued subscribers, or you can have both queued publishers and queued subscribers. Note, however, that you cannot have queued transient subscriptions—there is no support within the MSMQ binding for duplex callbacks, since that would render the disconnected aspect of the communication useless. As before, you can use the administration tool to manage the subscribers, and the administration operations are still connected and synchronous.
To utilize a queued publisher, the publishing service needs to expose a queued endpoint using the MSMQ binding. When firing events at a queued publisher, the publishing service can be offline or the publishing client itself can be disconnected. Note that when publishing two events to a queued publishing service, there are no guarantees as to the order in which these events will be delivered to and processed by the end subscribers. Due to the asynchronous concurrent publishing, there is no order, even when the events contract is configured for a session.
To deploy a queued subscriber, the persistent subscribing service needs to expose a queued endpoint. This will enable the subscriber to be offline even when the publisher is online. When the subscriber connects again, it will receive all of its queued-up events. In addition, queued subscribers can handle the case when the publishing service itself is disconnected, because no events are lost. Of course, having both a queued publisher and subscriber allows both to work offline at the same time.
When multiple events are fired at a single queued subscriber, there are no guarantees as to the order of delivery of the events, even when you configure the events contract for a session.
[9] I first wrote about my publish-subscribe framework in my article “WCF Essentials: What You Need to Know About One-Way Calls, Callbacks, and Events” (MSDN Magazine, October 2006).
18.191.253.62