Publish-Subscribe with the Service Bus

Chapter 11 presents the capabilities and aspects of the Windows Azure AppFabric Service Bus. The service bus offers a distinct way to manage events for applications that rely on the cloud, using the NetEventRelayBinding. This section presents these options along with the helper classes of ServiceModelEx, which streamline the overall experience.

The Event Relay Binding

The design shown in Figure 11-9 has the appearance of a general-purpose publish-subscribe pattern. In reality, it falls short of that and it is intended to provide only a lightweight, ready-made, cloud-assisted event distribution solution. Missing is administrative support to add and remove subscribers. More importantly, there is no support for discrete, operation-level events. Events equate to service endpoints or, more specifically, to the contract. The service cannot subscribe to particular operations on the contract but not to others. This means the subscribing service itself still receives events it may not care about simply because it has a matching endpoint.

For example, consider the IMyEvents contract:

[ServiceContract]
interface IMyEvents
{
   [OperationContract(IsOneWay = true)]
   void OnEvent1();

   [OperationContract(IsOneWay = true)]
   void OnEvent2(int number);

   [OperationContract(IsOneWay = true)]
   void OnEvent3(int number,string text);
}

If the subscriber defines the endpoint as so:

<endpoint
   address  = "sb://MyNamespace.servicebus.windows.net/IMyEvents"
   binding  = "netEventRelayBinding"
   contract = "IMyEvents"
/>

The subscriber will receive all calls to the endpoint. If the subscriber should receive calls only to the OnEvent2() operation, it must still expose an endpoint over the event relay binding, receiving the calls to OnEvent2(), but also receive all the unwanted traffic for OnEvent1() and OnEvent3() and explicitly filter them inside the service. All of this further complicates managing the subscriptions using an external administration tool.

This is a direct result of subscribing at the contract (or the endpoint) level and not at the discrete operation level. Much the same way, the publisher has no way of publishing just OnEvent2(). Publishing any of the events on the IMyEvents contract notifies all subscribers, regardless of their interest.

The only way to manage events at the operation level out of the box is to logically map endpoints to operations rather than contracts. Example D-9 shows the publisher view of such endpoints.

Example D-9. Defining events at the operation level

<endpoint name = "OnEvent1"
   address  = "sb://MyNamespace.servicebus.windows.net/IMyEvents/OnEvent1"
   binding  = "netOnewayBinding"
   contract = "IMyEvents"
/>
<endpoint name = "OnEvent2"
   address  = "sb://MyNamespace.servicebus.windows.net/IMyEvents/OnEvent2"
   binding  = "netOnewayBinding"
   contract = "IMyEvents"
/>
<endpoint name = "OnEvent3"
   address  = "sb://MyNamespace.servicebus.windows.net/IMyEvents/OnEvent3"
   binding  = "netOnewayBinding"
   contract = "IMyEvents"
/>

For the publisher, the downside is that it must maintain a proxy per operation on the contract. This not only complicates the publisher’s code, it is also expensive. Since Microsoft charges for the service bus based on connections, the more proxies you manage, the more it costs.

Matters are even more complex on the service side. First, you should allow subscribing and unsubscribing to individual events without disturbing other event processing in progress. Second, the subscriber cannot open all the endpoints using the same host, since that would gain nothing—the subscriber would still get all the unwanted events. The only way (out of the box) to manage operation-level events is to have as many hosts as subscribed operations on the contract, all targeting the same service type.

Each host will open a single endpoint corresponding to a specific event (operation). To subscribe or unsubscribe to a particular event at run-time, you must open or close respectively the corresponding host. Because you cannot rely on listing the endpoints in the host config file (this will just make all hosts open all endpoints), you must programmatically add each desired endpoint to the specific host, as shown in Example D-10 in pseudo code. The code in Example D-10 sets up the endpoints exposed to the publisher in Example D-9.

Example D-10. Equating events with operations

class MySubscriber : IMyEvents
{...}

ServiceHost hostEvent1;
ServiceHost hostEvent2;
ServiceHost hostEvent3;

Binding binding = new NetEventRelayBinding();
Uri baseAddress = new Uri("sb://MyNamespace.servicebus.windows.net/IMyEvents/");

//Subscribing to all events:
hostEvent1 = new ServiceHost(typeof(MySubscriber),baseAddress);
hostEvent1.AddServiceEndpoint(typeof(IMyEvents),binding,"OnEvent1");
hostEvent1.Open();

hostEvent2 = new ServiceHost(typeof(MySubscriber),baseAddress);
hostEvent2.AddServiceEndpoint(typeof(IMyEvents),binding,"OnEvent2");
hostEvent2.Open();

hostEvent3 = new ServiceHost(typeof(MySubscriber),baseAddress);
hostEvent3.AddServiceEndpoint(typeof(IMyEvents),binding,"OnEvent3");
hostEvent3.Open();

//Unsubscribe Event2():
hostEvent2.Close();

The code in Example D-10 is tedious, repetitive, and error-prone. It is also tightly coupled to the event contract. Not only that, but there is an expense penalty, since the service must pay for each host connection instead of a single host connection.

The ServiceBusEventsHost

To streamline, automate, and reduce the cost of subscribing to discrete events, I wrote the helper host ServiceBusEventsHost:

public class ServiceBusEventsHost : ServiceBusHost
{
   public ServiceBusEventsHost(Type serviceType,Uri baseAddress);
   public ServiceBusEventsHost(Type serviceType,Uri[] baseAddresses);

   /* Additional constructors */

   //Can optionally specify binding
   public virtual NetOnewayRelayBinding RelayBinding
   {get;set;}
   public void SetBinding(string bindingConfigName);

   //Subscription management
   public void Subscribe();
   public void Subscribe(Type contractType);
   public void Subscribe(Type contractType,string operation);

   public void Unsubscribe();
   public void Unsubscribe(Type contractType);
   public void Unsubscribe(Type contractType,string operation);
}

The constructors of ServiceBusEventsHost all require at least one base addresses (with a regular host, base addresses are optional). For each base address provided, ServiceBusEventsHost will open an endpoint per contract supported by the service type. The address of that endpoint will be the base address suffixed by the events contract type:

[base address]/[events contract name]

The key in implementing ServiceBusEventsHost is that with the service bus, when a host listens on a service bus address, it actually monitors all of its sub-addresses as well. As a result, ServiceBusEventsHost will also receive the publisher’s messages sent to:

[base address]/[events contract name]/[event operation]

ServiceBusEventsHost uses interception to rout the messages to the correct operation, all outside the scope of the service. The service will process only the events it has subscribed to. Managing the events outside the scope of the service, per-host, enables integration with your application administration tools.

ServiceBusEventsHost derives from ServiceBusHost (presented in Chapter 11) for automating security configuration and discovery. ServiceBusEventsHost will default to a plain instance of NetEventRelayBinding with anonymous message security. You can also provide ServiceBusEventsHost with the binding to use via the RelayBinding property or the SetBinding() method. Like any other host, you need to open and close ServiceBusEventsHost. However, to receive calls (the events), you must use one of the Subscribe() methods. You can subscribe (or unsubscribe) to all events on all service contracts supported by the service type, to all events on a particular service contract, or to a specific event operation on a particular service contract.

There is no need for a config file when using ServiceBusEventsHost, and the code in Example D-10 is reduced to:

ServiceBusEventsHost host = new ServiceBusEventsHost(typeof(MySubscriber),
                                       "sb://MyNamespace.servicebus.windows.net/");
host.Open();

//Subscribing to all events:
host.Subscribe();

//Unsubscribe Event2():
host.Unsubscribe(typeof(IMyEvents),"OnEvent2");

Example D-11 shows the partial implementation of ServiceBusEventsHost, without the error handling and synchronization code.

Example D-11. Implementing ServiceBusEventsHost (partial)

//Partial listing and without error handling and synchronization
public class ServiceBusEventsHost : ServiceBusHost
{
   //Managing the subscriptions
   Dictionary<string,List<string>> Subscriptions
   {get;set;}

   public ServiceBusEventsHost(Type serviceType,Uri[] baseAddresses) :
                                                   base(serviceType,baseAddresses)
   {
      Subscriptions = new Dictionary<string,List<string>>();

      foreach(Uri baseAddress in BaseAddresses)
      {
         Type[] contracts = GetServiceContracts(Description.ServiceType);
         foreach(Type contract in contracts)
         {
            AddServiceEndpoint(contract,RelayBinding,
                               baseAddress.AbsoluteUri + contract);
            Subscriptions[contract.Name] = new List<string>();
         }
      }
      IEndpointBehavior selector = new EventSelector(Subscriptions);
      foreach(ServiceEndpoint endpoint in Description.Endpoints)
      {
         endpoint.Behaviors.Add(selector);
      }
   }

   public void Subscribe(Type contractType,string operation)
   {
      if(Subscriptions[contractType.Name].Contains(operation) == false)
      {
         Subscriptions[contractType.Name].Add(operation);
      }
   }

   public void Unsubscribe(Type contractType,string operation)
   {
      if(Subscriptions[contractType.Name].Contains(operation))
      {
         Subscriptions[contractType.Name].Remove(operation);
      }
   }

   //Uses reflection to get all service contracts
   static Type[] GetServiceContracts(Type serviceType)
   {...}

   class EventSelector : IDispatchOperationSelector,IEndpointBehavior
   {
      readonly Dictionary<string,List<string>> m_Subscriptions;

      public EventSelector(Dictionary<string,List<string>> subscriptions)
      {
         m_Subscriptions = subscriptions;
      }
      public string SelectOperation(ref Message message)
      {
         string[] slashes = message.Headers.Action.Split('/'),
         string contract  = slashes[slashes.Length-2];
         string operation = slashes[slashes.Length-1];

         if(m_Subscriptions[contract].Contains(operation))
         {
            return operation;
         }
         else
         {
            return null;
         }
      }
      void IEndpointBehavior.ApplyDispatchBehavior(ServiceEndpoint endpoint,
                                            EndpointDispatcher endpointDispatcher)
      {
         endpointDispatcher.DispatchRuntime.OperationSelector = this;
      }
      ...
   }
}

ServiceBusEventsHost stores the subscriptions in the Subscriptions dictionary, which has a matching key for every service contract supported by the service type. For each such contract, ServiceBusEventsHost manages a linked list of operations on the contract, representing the subscribed events. The Subscribe() and Unsubscribe() methods merely add or remove the events from the corresponding operations linked list.

The heart of ServiceBusEventsHost is its ability to intercept the messages sent to the endpoint and rout them to the desired event-handling operation. For such cases, WCF provides an extensibility hook in the form of the IDispatchOperationSelector interface, defined as:

public interface IDispatchOperationSelector
{
   string SelectOperation(ref Message message);
}

The implementation of IDispatchOperationSelector is attached to the endpoint dispatcher. Every time a method is received, you can have your implementation of SelectOperation() arbitrate which operation on the contract will actually process the message, by returning the name of the selected operation.

Every endpoint dispatcher is represented by the EndpointDispatcher, with the property DispatchRuntime:

public class EndpointDispatcher
{
   public DispatchRuntime DispatchRuntime
   {get;}
   //More members
}

DispatchRuntime has a property called OperationSelector of the type IDispatchOperationSelector:

public sealed class DispatchRuntime
{
   public IDispatchOperationSelector OperationSelector
   {get;}
   //More members
}

To install an operation selector, use an endpoint behavior, because the ApplyDispatchBehavior() method of IEndpointBehavior is given a reference to the endpoint’s dispatcher:

public interface IEndpointBehavior
{
   void ApplyDispatchBehavior(ServiceEndpoint endpoint,
                              EndpointDispatcher endpointDispatcher);
   //More members
}

ServiceBusEventsHost has such an implementation of an endpoint behavior with the private nested class EventSelector:

class EventSelector : IDispatchOperationSelector,IEndpointBehavior
{...}

EventSelector supports both IEndpointBehavior (so it can interact with the dispatcher) and IDispatchOperationSelector (so it can attach itself to the dispatcher).

The constructors of ServiceBusEventsHost iterate over the collection of endpoints in the service description, adding an instance of EventSelector for each endpoint.

ServiceBusEventsHost passes a reference to the dictionary containing the subscriptions to the constructor of EventSelector, and EventSelector stores it as a class member.

In EventSelector, the implementation of ApplyDispatchBehavior() attaches the instance to the dispatcher runtime as an operation selector.

The implementation of SelectOperation() parses the contract and the operation name out of the incoming message. It then uses them to see if there is a matching subscription in the dictionary and, if so, returns the name of the target operation. If no subscription is found, SelectOperation() discards the message.

The ServiceBusEventsClientBase

The publisher can use any proxy over the one-way relay binding to fire the event, including my OneWayClientBase<T> presented in Chapter 11. The publisher must provide the endpoint address with the base address that ServiceBusEventsHost expects, suffixed by the contract name. To tighten this potential loose screw, I wrote the helper class ServiceBusEventsClientBase defined in Example D-12.

Example D-12. The ServiceBusEventsClientBase class

public abstract class ServiceBusEventsClientBase<T> : OneWayClientBase<T>
                                                      where T : class
{
   public ServiceBusEventsClientBase(string baseAddress)
                          : this(baseAddress,new NetOnewayRelayBinding())
   {}
   public ServiceBusEventsClientBase(string baseAddress,
                                     NetOnewayRelayBinding binding)
                              : base(binding,ToEventAddress(namespaceBaseAddress))
   {}

   /* More constructor */

   static EndpointAddress ToEventAddress(string baseAddress)
   {
      return new EndpointAddress(baseAddress + typeof(T).Name);
   }
}

All ServiceBusEventsClientBase needs is the events base address, and there is no need for a config file. ServiceBusEventsClientBase will append the contract name to the events base address.

You can also provide ServiceBusEventsClientBase with the binding to use as well as the security parameters offered by the constructors of OneWayClientBase<T>.

To use ServiceBusEventsClientBase<T>, just derive from it like the regular ClientBase<T>:

class MyEventsProxy : ServiceBusEventsClientBase<IMyEvents>,IMyEvents
{
   public MyEventsProxy(string baseAddress) : base(baseAddress)
   {}

   public void OnEvent1()
   {
      Channel.OnEvent1();
   }
   public void OnEvent2(int number)
   {
      Channel.OnEvent2(number);
   }
   public void OnEvent3(int number,string text)
   {
      Channel.OnEvent3(number,text);
   }
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.114.221