The Publish-Subscribe Framework

ServiceModelEx contains a simple-to-use, industrial-strength publish-subscribe framework. I wanted to provide not just a publish-subscribe service, but also a general-purpose framework that automates implementing such services and adding the support for any application in just one line of code (if that). The first step in building the framework was to factor the publish-subscribe management interfaces and provide separate contracts for transient and persistent subscriptions and for publishing.[9]

Managing Transient Subscriptions

For managing transient subscriptions, I defined the ISubscriptionService interface shown in Example D-1.

Example D-1. The ISubscriptionService interface manages transient subscribers

[ServiceContract]
public interface ISubscriptionService
{
   [OperationContract]
   void Subscribe(string eventOperation);

   [OperationContract]
   void Unsubscribe(string eventOperation);
}

Note that ISubscriptionService does not identify the callback contract its implementing endpoint expects. Being a general-purpose interface, it is unaware of particular callback contracts. It is up to the using application to define those callback contracts. The callback interface is provided in the using application by deriving from ISubscriptionService and specifying the desired callback contract:

public interface IMyEvents
{
   [OperationContract(IsOneWay = true)]
   void OnEvent1();

   [OperationContract(IsOneWay = true)]
   void OnEvent2(int number);

   [OperationContract(IsOneWay = true)]
   void OnEvent3(int number,string text);
}

[ServiceContract(CallbackContract = typeof(IMyEvents))]
public interface IMySubscriptionService : ISubscriptionService
{}

Typically, every operation on the callback contract corresponds to a specific event. The subinterface of ISubscriptionService (IMySubscriptionService, in this example) does not need to add operations. ISubscriptionService provides the transient subscription management functionality. In each call to Subscribe() or Unsubscribe(), the subscriber needs to provide the name of the operation (and therefore the event) it wants to subscribe to or unsubscribe from. If the caller wants to subscribe to all events, it can pass an empty or null string.

My framework offers an implementation for the methods of ISubscriptionService in the form of the generic abstract class SubscriptionManager<T>:

public abstract class SubscriptionManager<T> where T : class
{
   public void Subscribe(string eventOperation);
   public void Unsubscribe(string eventOperation);
   //More members
}

The generic type parameter for SubscriptionManager<T> is the events contract. Note that SubscriptionManager<T> does not implement ISubscriptionService.

The using application needs to expose its own transient subscription service in the form of an endpoint that supports its specific subinterface of ISubscriptionService. To do so, the application needs to provide a service class that derives from SubscriptionManager<T>, specify the callback contract as a type parameter, and derive from that specific subinterface of ISubscriptionService. For example, to implement a transient subscription service using the IMyEvents callback interface:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class MySubscriptionService : SubscriptionManager<IMyEvents>,
                              IMySubscriptionService
{}

MySubscriptionService doesn’t need any code because IMySubscriptionService does not add any new operations and SubscriptionManager<T> already implements the methods of ISubscriptionService.

Note that just deriving from SubscriptionManager<IMyEvents> is insufficient because it does not derive from a contract interface—you must add the derivation from IMySubscriptionService to support transient subscriptions.

Finally, the using application needs to define an endpoint for IMySubscriptionService:

<services>
   <service name = "MySubscriptionService">
      <endpoint
         address  = "..."
         binding  = "..."
         contract = "IMySubscriptionService"
      />
   </service>
</services>

Example D-2 shows how SubscriptionManager<T> manages transient subscriptions.

Example D-2. The transient subscribers management in SubscriptionManager<T>

public abstract class SubscriptionManager<T> where T : class
{
   static Dictionary<string,List<T>> m_TransientStore;

   static SubscriptionManager()
   {
      m_TransientStore = new Dictionary<string,List<T>>();
      string[] methods = GetOperations();
      Action<string> insert = (methodName)=>
                              {
                                 m_TransientStore.Add(methodName,new List<T>());
                              };
      methods.ForEach(insert);
   }
   static string[] GetOperations()
   {
      MethodInfo[] methods = typeof(T).GetMethods(BindingFlags.Public|
                                                  BindingFlags.FlattenHierarchy|
                                                  BindingFlags.Instance);
      List<string> operations = new List<string>(methods.Length);

      Action<MethodInfo> add = (method)=>
                               {
                                  Debug.Assert(!operations.Contains(method.Name));
                                  operations.Add(method.Name);
                               };
      methods.ForEach(add);
      return operations.ToArray();
   }
   static void AddTransient(T subscriber,string eventOperation)
   {
      lock(typeof(SubscriptionManager<T>))
      {
         List<T> list = m_TransientStore[eventOperation];
         if(list.Contains(subscriber))
         {
            return;
         }
         list.Add(subscriber);
      }
   }
   static void RemoveTransient(T subscriber,string eventOperation)
   {
      lock(typeof(SubscriptionManager<T>))
      {
         List<T> list = m_TransientStore[eventOperation];
         list.Remove(subscriber);
      }
   }

   public void Subscribe(string eventOperation)
   {
      lock(typeof(SubscriptionManager<T>))
      {
         T subscriber = OperationContext.Current.GetCallbackChannel<T>();
         if(String.IsNullOrEmpty(eventOperation) == false)
         {
            AddTransient(subscriber,eventOperation);
         }
         else
         {
            string[] methods = GetOperations();
            Action<string> addTransient = (methodName)=>
                                          {
                                             AddTransient(subscriber,methodName);
                                          };
            methods.ForEach(addTransient);
         }
      }
   }

   public void Unsubscribe(string eventOperation)
   {
      lock(typeof(SubscriptionManager<T>))
      {
         T subscriber = OperationContext.Current.GetCallbackChannel<T>();
         if(String.IsNullOrEmpty(eventOperation) == false)
         {
            RemoveTransient(subscriber,eventOperation);
         }
         else
         {
            string[] methods = GetOperations();
            Action<string> removeTransient = (methodName)=>
                                             {
                                            RemoveTransient(subscriber,methodName);
                                             };
            methods.ForEach(removeTransient);
         }
      }
   }
   //More members
}

SubscriptionManager<T> stores the transient subscribers in a generic static dictionary called m_TransientStore:

static Dictionary<string,List<T>> m_TransientStore;

Each entry in the dictionary contains the name of the event operation and all its subscribers in the form of a linked list. The static constructor of SubscriptionManager<T> uses reflection to get all the operations of the callback interfaces (the type parameter for SubscriptionManager<T>) and initializes the dictionary to have all the operations with empty lists. The Subscribe() method extracts the callback reference from the operation call context. If the caller specifies an operation name, Subscribe() calls the helper method AddTransient(). AddTransient() retrieves the list of subscribers for the event from the store, and if the list does not contain the subscriber, it adds it in.

If the caller specifies an empty string or null for the operation name, Subscribe() calls AddTransient() for each operation in the callback contract.

Unsubscribe() operates in a similar manner. Note that the caller can subscribe to all events and then unsubscribe from a particular one.

Managing Persistent Subscribers

For managing persistent subscribers, I defined the IPersistentSubscriptionService interface shown in Example D-3.

Example D-3. The IPersistentSubscriptionService interface manages persistent subscribers

[ServiceContract]
public interface IPersistentSubscriptionService
{
   [OperationContract]
   [TransactionFlow(TransactionFlowOption.Allowed)]
   void Subscribe(string address,string eventsContract,string eventOperation);

   [OperationContract]
   [TransactionFlow(TransactionFlowOption.Allowed)]
   void Unsubscribe(string address,string eventsContract,string eventOperation);
   //More members
}

To add a persistent subscriber, the caller needs to call Subscribe(), providing the address of the subscriber, the event’s contract name, and the specific event operation itself. To unsubscribe, the caller calls Unsubscribe() with the same information. Note that IPersistentSubscriptionService does not imply where the subscribers persist on the service side—that is an implementation detail.

The class SubscriptionManager<T>, presented previously, also implements the methods of IPersistentSubscriptionService:

[BindingRequirement(TransactionFlowEnabled = true)]
public abstract class SubscriptionManager<T> where T : class
{
   public void Unsubscribe(string address,string eventsContract,
                           string eventOperation);
   public void Subscribe(string address,string eventsContract,
                         string eventOperation);
   //More members
}

SubscriptionManager<T> stores the persistent subscribers in SQL Server. It is configured to use the Client/Service transaction mode (presented in Chapter 7), and it enforces that mode using my BindingRequirement attribute.

The generic type parameter for SubscriptionManager<T> is the events contract. Note that SubscriptionManager<T> does not derive from IPersistentSubscriptionService. The using application needs to expose its own persistent subscription service, but there is no need to derive a new contract from IPersistentSubscriptionService, because no callback references are required. The application simply derives from SubscriptionManager<T>, specifying the events contract as a type parameter and adding a derivation from IPersistentSubscriptionService, for example:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class MySubscriptionService : SubscriptionManager<IMyEvents>,
                                                    IPersistentSubscriptionService
{}

There’s no need for any code in MySubscriptionService, because SubscriptionManager<T> already implements the methods of IPersistentSubscriptionService.

Note that just deriving from SubscriptionManager<IMyEvents> is insufficient, because SubscriptionManager<IMyEvents> does not derive from a contract interface—you must add the derivation from IPersistentSubscriptionService to support persistent subscriptions.

Finally, the application needs to define an endpoint for IPersistentSubscriptionService:

<services>
   <service name = "MySubscriptionService">
      <endpoint
         address  = "..."
         binding  = "..."
         contract = "ServiceModelEx.IPersistentSubscriptionService"
      />
   </service>
</services>

The implementation of the methods of IPersistentSubscriptionService by SubscriptionManager<T> is shown in Example D-4. Example D-4 is very similar to Example D-2, except the subscribers are stored in SQL Server, not in memory in a dictionary.

Example D-4. Persistent subscriber management in SubscriptionManager<T>

public abstract class SubscriptionManager<T> where T : class
{
   static void AddPersistent(string address,string eventsContract,
                             string eventOperation)
   {
      //Store the subscription in SQL Server
   }

   static void RemovePersistent(string address,string eventsContract,
                                string eventOperation)
   {
      //Remove the subscription from SQL Server
   }

   [OperationBehavior(TransactionScopeRequired = true)]
   public void Subscribe(string address,string eventsContract,
                         string eventOperation)
   {
      if(String.IsNullOrEmpty(eventOperation) == false)
      {
         AddPersistent(address,eventsContract,eventOperation);
      }
      else
      {
         string[] methods = GetOperations();
         Action<string> addPersistent = (methodName)=>
                                        {
                                  AddPersistent(address,eventsContract,methodName);
                                        };
         methods.ForEach(addPersistent);
      }
   }

   [OperationBehavior(TransactionScopeRequired = true)]
   public void Unsubscribe(string address,string eventsContract,
                           string eventOperation)
   {
      if(String.IsNullOrEmpty(eventOperation) == false)
      {
         RemovePersistent(address,eventsContract,eventOperation);
      }
      else
      {
         string[] methods = GetOperations();
         Action<string> removePersistent = (methodName)=>
                                           {
                               RemovePersistent(address,eventsContract,methodName);
                                           };
         methods.ForEach(removePersistent);
      }
   }
   //More members
}

If you want the application to support both transient and persistent subscribers for the same events contract, simply derive the subscription service class from both the specialized subinterface of ISubscriptionService and IPersistentSubscriptionService:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class MySubscriptionService : SubscriptionManager<IMyEvents>,
                              IMySubscriptionService,
                              IPersistentSubscriptionService
{}

Next, expose the two matching endpoints:

<services>
   <service name = "MySubscriptionService">
      <endpoint
         address  = "..."
         binding  = "..."
         contract = "IMySubscriptionService"
      />
      <endpoint
         address  = "..."
         binding  = "..."
         contract = "ServiceModelEx.IPersistentSubscriptionService"
      />
   </service>
</services>

Event Publishing

The parts of the publish-subscribe framework shown so far deal only with the aspects of subscription management. The framework also enables easy implementation of the publishing service. The publishing service should support the same events contract as the subscribers, and it should be the only point of contact known to the publishers in the application. Because the publishing service exposes the events contract in an endpoint, you need to mark the events contract as a service contract, even if you only use it for duplex callbacks with transient subscribers:

[ServiceContract]
public interface IMyEvents
{
   [OperationContract(IsOneWay = true)]
   void OnEvent1();

   [OperationContract(IsOneWay = true)]
   void OnEvent2(int number);

   [OperationContract(IsOneWay = true)]
   void OnEvent3(int number,string text);
}

The publish-subscribe framework contains the helper class PublishService<T>, defined as:

public abstract class PublishService<T> where T : class
{
   protected static void FireEvent(params object[] args);
}

PublishService<T> requires as a type parameter the type of the events contract. To provide your own publishing service, derive from PublishService<T> and use the FireEvent() method to deliver the event to all subscribers, be they transient or persistent, as shown in Example D-5.

Example D-5. Implementing an event-publishing service

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class MyPublishService : PublishService<IMyEvents>,IMyEvents
{
   public void OnEvent1()
   {
      FireEvent();
   }
   public void OnEvent2(int number)
   {
      FireEvent(number);
   }
   public void OnEvent3(int number,string text)
   {
      FireEvent(number,text);
   }
}

Note that you can use FireEvent() to fire any type of event, regardless of the parameters, because of the use of the params object array.

Finally, the application needs to expose an endpoint for the publishing service with the events contract:

<services>
   <service name = "MyPublishService">
      <endpoint
         address  = "..."
         binding  = "..."
         contract = "IMyEvents"
      />
   </service>
</services>

Example D-6 shows the implementation of PublishService<T>.

Example D-6. Implementing PublishService<T>

public abstract class PublishService<T> where T : class
{
   protected static void FireEvent(params object[] args)
   {
      string action = OperationContext.Current.IncomingMessageHeaders.Action;
      string[] slashes = action.Split('/'),
      string methodName = slashes[slashes.Length-1];

      FireEvent(methodName,args);
   }
   static void FireEvent(string methodName,object[] args)
   {
      PublishPersistent(methodName,args);
      PublishTransient(methodName,args);
   }
   static void PublishPersistent(string methodName,object[] args)
   {
      T[] subscribers = SubscriptionManager<T>.GetPersistentList(methodName);
      Publish(subscribers,true,methodName,args);
   }
   static void PublishTransient(string methodName,object[] args)
   {
      T[] subscribers = SubscriptionManager<T>.GetTransientList(methodName);
      Publish(subscribers,false,methodName,args);
   }
   static void Publish(T[] subscribers,bool closeSubscribers,string methodName,
                                                                 object[] args)
   {
      WaitCallback fire = (subscriber)=>
                          {
                             Invoke(subscriber as T,methodName,args);
                             if(closeSubscribers)
                             {
                                using(subscriber as IDisposable)
                                {}
                             }
                          };
      Action<T> queueUp = (subscriber)=>
                          {
                             ThreadPool.QueueUserWorkItem(fire,subscriber);
                          };
      subscribers.ForEach(queueUp);
   }
   static void Invoke(T subscriber,string methodName,object[] args)
   {
      Debug.Assert(subscriber != null);
      Type type = typeof(T);
      MethodInfo methodInfo = type.GetMethod(methodName);
      try
      {
         methodInfo.Invoke(subscriber,args);
      }
      catch
      {}
   }
}

To simplify firing the event, the FireEvent() method accepts the parameters to pass to the subscribers, yet its caller does not provide it with the name of the operation to invoke on the subscribers. Instead, FireEvent() extracts the method name from the incoming message headers. It then uses an overloaded FireEvent() that accepts the method name. That method, in turn, uses the helper method PublishPersistent() to publish to all persistent subscribers and uses the PublishTransient() helper method to publish to all transient subscribers. The publishing methods operate in an almost identical way: they access SubscriptionManager<T> to retrieve their respective subscribers list, then use the Publish() method to fire the event. The subscribers are returned in the form of an array of proxies to the subscribers, which is passed to the Publish() method.

Publish() could have simply invoked the subscribers at this point. However, I wanted to support concurrent publishing of events so that if any subscriber is undisciplined and takes a long time to process the event, this will not preclude the other subscribers from receiving the event in a timely manner. Having the event operations marked as one way is no guarantee of asynchronous invocation, and besides, I wanted to support concurrent publishing even when the event operation is not marked as a one-way operation. Publish() defines two anonymous methods. The first calls the Invoke() helper method, which fires the event to the individual subscriber provided and then closes the proxy if so specified. Because Invoke() was never compiled against the specific subscriber type, it uses reflection and late binding for the invocation. Invoke() also suppresses any exceptions raised by the invocation, because these are of no interest to the publishing party. The second anonymous method queues up the first anonymous method to be executed by a thread from the thread pool. Finally, Publish() invokes the second anonymous method on every subscriber in the provided array.

Notice how uniformly PublishService<T> treats the subscribers—it almost does not matter if they are transient or persistent. The only difference is that after publishing to a persistent subscriber, you need to close the proxy. You can achieve this uniformity using the helper methods GetTransientList() and GetPersistentList() of SubscriptionManager<T>. Of these two, GetTransientList() is the simpler one:

public abstract class SubscriptionManager<T> where T : class
{
   internal static T[] GetTransientList(string eventOperation)
   {
      lock(typeof(SubscriptionManager<T>))
      {
         if(m_TransientStore.ContainsKey(eventOperation))
         {
            List<T> list = m_TransientStore[eventOperation];
            return list.ToArray();
         }
         return new T[]{};
      }
   }
   //More members
}

GetTransientList() looks up all the subscribers to the specified operation in the transient store and returns them as an array. GetPersistentList() faces a bigger challenge: there is no ready-made list of proxies to persistent subscribers. The only thing known about them is their addresses. GetPersistentList() therefore needs to instantiate the persistent subscribers’ proxies, as shown in Example D-7.

Example D-7. Creating the persistent subscribers proxy list

public abstract class SubscriptionManager<T> where T : class
{
   internal static T[] GetPersistentList(string eventOperation)
   {
      string[] addresses =  GetSubscribersToContractEventOperation(
                                              typeof(T).ToString(),eventOperation);

      List<T> subscribers = new List<T>(addresses.Length);

      foreach(string address in addresses)
      {
         Binding binding = GetBindingFromAddress(address);
         T proxy = ChannelFactory<T>.CreateChannel(binding,
                                                   new EndpointAddress(address));
         subscribers.Add(proxy);
      }
      return subscribers.ToArray();
   }
   static string[] GetSubscribersToContractEventOperation(string eventsContract,
                                                          string eventOperation)
   {
      //Query SQL Server for the subscribers to the event
   }
   static Binding GetBindingFromAddress(string address)
   {
      if(address.StartsWith("http:") || address.StartsWith("https:"))
      {
         WSHttpBinding binding = new WSHttpBinding();
         binding.ReliableSession.Enabled = true;
         binding.TransactionFlow = true;
         return binding;
      }
      if(address.StartsWith("net.tcp:"))
      {
         NetTcpBinding binding = new NetTcpBinding();
         binding.ReliableSession.Enabled = true;
         binding.TransactionFlow = true;
         return binding;
      }
      /* Similar code for the one-way relay, IPC and MSMQ bindings */
      Debug.Assert(false,"Unsupported binding specified");
      return null;
   }
   //More members
}

To create a proxy for each subscriber, GetPersistentList() needs the subscriber’s address, binding, and contract. The contract is, of course, the type parameter for SubscriptionManager<T>. To obtain the addresses, GetPersistentList() calls GetSubscribersToContractEventOperation(), which queries the subscribers store and returns as an array the addresses of all of the persistent subscribers who have subscribed to the specified event. All GetPersistentList() needs now is the binding used by each subscriber. For that, GetPersistentList() calls the helper method GetBindingFromAddress(), which infers the binding to use from the address schema. GetBindingFromAddress() assumes all HTTP or HTTPS addresses indicate the use the WSHttpBinding. For service bus addresses with the sb scheme, GetBindingFromAddress() uses an instance of the NetOnewayRelayBinding.

In addition, when applicable, GetBindingFromAddress() turns on reliability and transaction propagation for each binding to enable inclusion of the event in the publisher’s transaction when one-way operations are not used, such as with this events contract:

[ServiceContract]
interface IMyEvents
{
   [OperationContract]
   [TransactionFlow(TransactionFlowOption.Allowed)]
   void OnEvent1();

   [OperationContract]
   [TransactionFlow(TransactionFlowOption.Allowed)]
   void OnEvent2(int number);

   [OperationContract]
   [TransactionFlow(TransactionFlowOption.Allowed)]
   void OnEvent3(int number,string text);
}

Administering Persistent Subscribers

While you can add and remove persistent subscriptions at runtime by using the methods of the IPersistentSubscriptionService interface shown in Example D-3, because of their persistent nature, you can best manage the subscriptions with some kind of administration tool. To that end, IPersistentSubscriptionService defines additional operations that answer various queries against the subscribers store, as shown in Example D-8.

Example D-8. The IPersistentSubscriptionService interface

[DataContract]
public struct PersistentSubscription
{
   [DataMember]
   public string Address
   {get;set;}

   [DataMember]
   public string EventsContract
   {get;set;}

   [DataMember]
   public string EventOperation
   {get;set;}
}

[ServiceContract]
public interface IPersistentSubscriptionService
{
   //Administration operations
   [OperationContract]
   [TransactionFlow(TransactionFlowOption.Allowed)]
   PersistentSubscription[] GetAllSubscribers();

   [OperationContract]
   [TransactionFlow(TransactionFlowOption.Allowed)]
   PersistentSubscription[] GetSubscribersToContract(string eventsContract);

   [OperationContract]
   [TransactionFlow(TransactionFlowOption.Allowed)]
   string[] GetSubscribersToContractEventType(string eventsContract,
                                              string eventOperation);
   [OperationContract]
   [TransactionFlow(TransactionFlowOption.Allowed)]
   PersistentSubscription[] GetAllSubscribersFromAddress(string address);
   //More members
}

All of these administration operations utilize a simple data contract called PersistentSubscription, which contains the address of the subscriber, the subscribed contract, and the event. GetAllSubscribers() simply returns the list of all subscribers. GetSubscribersToContract() returns all subscribers to a specific contract, and GetSubscribersToContractEventType() returns all subscribers to a particular event operation on a specified contract. Finally, for the sake of completeness, GetAllSubscribersFromAddress() returns all subscribers that provided a specified address.

My publish-subscribe framework includes a sample persistent subscription administration tool called Persistent Subscription Manager, shown in Figure D-2.

The Persistent Subscription Manager application

Figure D-2. The Persistent Subscription Manager application

The administration tool uses IPersistentSubscriptionService to add and remove subscriptions. To add a new subscription, you need to provide it with the metadata exchange address of the events contract definition. You can use the metadata exchange address of the persistent subscriber itself or the metadata exchange address of the publishing service (such as the one shown in Example D-5), because they are polymorphic. Enter the metadata exchange base address in the MEX Address text box and click the Lookup button. The tool will programmatically retrieve the metadata of the event service and populate the Contract and Event combo boxes. My MetadataHelper class, presented in Chapter 2, retrieves the metadata and parses its content.

Once you’ve provided the address of the persistent subscriber, click the Subscribe button. Persistent Subscription Manager then adds the subscription by calling to the subscription service (MySubscriptionService in the examples so far). The Persistent Subscription Manager config file maintains the address for the subscription service.

Singleton subscriber

While duplex operations are, in general, the only way to subscribe a live object, there is one exception to that rule: a singleton subscriber. You can treat a singleton service as just another persistent subscriber and add its address to the subscription store. This technique is particularly useful for user interface applications that need to monitor some events. You can use my FormHost<F> (presented in Chapter 8) to expose the form as a singleton, and then add the form as a persistent subscriber. Add the form using the Persistent Subscription Manager tool, or the form can subscribe itself upon startup.

Note

The publish-subscribe pattern also decouples the system security-wise. Publishers only need to authenticate themselves against a single publishing service, as opposed to multiple subscribers and potentially multiple security mechanisms. The subscribers, in turn, only need to allow the publishing service, rather than all publishers in the system, to deliver events; they trust the publishing service to properly authenticate and authorize the publishers. Applying role-based security on the publishing service allows you to easily enforce in one place various rules regarding who is allowed to publish an event across the system.

Queued Publishers and Subscribers

Instead of using the synchronous bindings to either publish or subscribe to the events, you can use the NetMsmqBinding. A queued publish-subscribe service combines the benefits of a loosely coupled system and the flexibility of disconnected execution. When using queued events, all events on the contract must, of course, be marked as one-way operations. As shown in Figure D-3, you can use queuing at either end independently.

Queued publish-subscribe deployment

Figure D-3. Queued publish-subscribe deployment

You can have a queued publisher and connected synchronous subscribers, you can have a connected publisher publishing to queued subscribers, or you can have both queued publishers and queued subscribers. Note, however, that you cannot have queued transient subscriptions—there is no support within the MSMQ binding for duplex callbacks, since that would render the disconnected aspect of the communication useless. As before, you can use the administration tool to manage the subscribers, and the administration operations are still connected and synchronous.

Queued publisher

To utilize a queued publisher, the publishing service needs to expose a queued endpoint using the MSMQ binding. When firing events at a queued publisher, the publishing service can be offline or the publishing client itself can be disconnected. Note that when publishing two events to a queued publishing service, there are no guarantees as to the order in which these events will be delivered to and processed by the end subscribers. Due to the asynchronous concurrent publishing, there is no order, even when the events contract is configured for a session.

Queued subscriber

To deploy a queued subscriber, the persistent subscribing service needs to expose a queued endpoint. This will enable the subscriber to be offline even when the publisher is online. When the subscriber connects again, it will receive all of its queued-up events. In addition, queued subscribers can handle the case when the publishing service itself is disconnected, because no events are lost. Of course, having both a queued publisher and subscriber allows both to work offline at the same time.

When multiple events are fired at a single queued subscriber, there are no guarantees as to the order of delivery of the events, even when you configure the events contract for a session.



[9] I first wrote about my publish-subscribe framework in my article “WCF Essentials: What You Need to Know About One-Way Calls, Callbacks, and Events” (MSDN Magazine, October 2006).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.253.62