Chapter 11 presents the capabilities and
aspects of the Windows Azure AppFabric Service Bus. The service bus
offers a distinct way to manage events for applications that rely on the
cloud, using the NetEventRelayBinding
. This section presents
these options along with the helper classes of ServiceModelEx, which
streamline the overall experience.
The design shown in Figure 11-9 has the appearance of a general-purpose publish-subscribe pattern. In reality, it falls short of that and it is intended to provide only a lightweight, ready-made, cloud-assisted event distribution solution. Missing is administrative support to add and remove subscribers. More importantly, there is no support for discrete, operation-level events. Events equate to service endpoints or, more specifically, to the contract. The service cannot subscribe to particular operations on the contract but not to others. This means the subscribing service itself still receives events it may not care about simply because it has a matching endpoint.
For example, consider the IMyEvents
contract:
[ServiceContract] interface IMyEvents { [OperationContract(IsOneWay = true)] void OnEvent1(); [OperationContract(IsOneWay = true)] void OnEvent2(int number); [OperationContract(IsOneWay = true)] void OnEvent3(int number,string text); }
If the subscriber defines the endpoint as so:
<endpoint address = "sb://MyNamespace.servicebus.windows.net/IMyEvents" binding = "netEventRelayBinding" contract = "IMyEvents" />
The subscriber will receive all calls to the endpoint. If the
subscriber should receive calls only to the OnEvent2()
operation, it must still expose
an endpoint over the event relay binding, receiving the calls to
OnEvent2()
, but also receive all
the unwanted traffic for OnEvent1()
and OnEvent3()
and explicitly
filter them inside the service. All of this further complicates
managing the subscriptions using an external administration
tool.
This is a direct result of subscribing at the contract (or the
endpoint) level and not at the discrete operation level. Much the same
way, the publisher has no way of publishing just OnEvent2()
. Publishing any of the events on
the IMyEvents
contract notifies all
subscribers, regardless of their interest.
The only way to manage events at the operation level out of the box is to logically map endpoints to operations rather than contracts. Example D-9 shows the publisher view of such endpoints.
Example D-9. Defining events at the operation level
<endpoint name = "OnEvent1" address = "sb://MyNamespace.servicebus.windows.net/IMyEvents/OnEvent1" binding = "netOnewayBinding" contract = "IMyEvents" /> <endpoint name = "OnEvent2" address = "sb://MyNamespace.servicebus.windows.net/IMyEvents/OnEvent2" binding = "netOnewayBinding" contract = "IMyEvents" /> <endpoint name = "OnEvent3" address = "sb://MyNamespace.servicebus.windows.net/IMyEvents/OnEvent3" binding = "netOnewayBinding" contract = "IMyEvents" />
For the publisher, the downside is that it must maintain a proxy per operation on the contract. This not only complicates the publisher’s code, it is also expensive. Since Microsoft charges for the service bus based on connections, the more proxies you manage, the more it costs.
Matters are even more complex on the service side. First, you should allow subscribing and unsubscribing to individual events without disturbing other event processing in progress. Second, the subscriber cannot open all the endpoints using the same host, since that would gain nothing—the subscriber would still get all the unwanted events. The only way (out of the box) to manage operation-level events is to have as many hosts as subscribed operations on the contract, all targeting the same service type.
Each host will open a single endpoint corresponding to a specific event (operation). To subscribe or unsubscribe to a particular event at run-time, you must open or close respectively the corresponding host. Because you cannot rely on listing the endpoints in the host config file (this will just make all hosts open all endpoints), you must programmatically add each desired endpoint to the specific host, as shown in Example D-10 in pseudo code. The code in Example D-10 sets up the endpoints exposed to the publisher in Example D-9.
Example D-10. Equating events with operations
class MySubscriber : IMyEvents {...} ServiceHost hostEvent1; ServiceHost hostEvent2; ServiceHost hostEvent3; Binding binding = new NetEventRelayBinding(); Uri baseAddress = new Uri("sb://MyNamespace.servicebus.windows.net/IMyEvents/"); //Subscribing to all events: hostEvent1 = new ServiceHost(typeof(MySubscriber),baseAddress); hostEvent1.AddServiceEndpoint(typeof(IMyEvents),binding,"OnEvent1"); hostEvent1.Open(); hostEvent2 = new ServiceHost(typeof(MySubscriber),baseAddress); hostEvent2.AddServiceEndpoint(typeof(IMyEvents),binding,"OnEvent2"); hostEvent2.Open(); hostEvent3 = new ServiceHost(typeof(MySubscriber),baseAddress); hostEvent3.AddServiceEndpoint(typeof(IMyEvents),binding,"OnEvent3"); hostEvent3.Open(); //Unsubscribe Event2(): hostEvent2.Close();
The code in Example D-10 is tedious, repetitive, and error-prone. It is also tightly coupled to the event contract. Not only that, but there is an expense penalty, since the service must pay for each host connection instead of a single host connection.
To streamline, automate, and reduce the cost of subscribing to
discrete events, I wrote the helper host ServiceBusEventsHost
:
public class ServiceBusEventsHost : ServiceBusHost { public ServiceBusEventsHost(Type serviceType,Uri baseAddress); public ServiceBusEventsHost(Type serviceType,Uri[] baseAddresses); /* Additional constructors */ //Can optionally specify binding public virtual NetOnewayRelayBinding RelayBinding {get;set;} public void SetBinding(string bindingConfigName); //Subscription management public void Subscribe(); public void Subscribe(Type contractType); public void Subscribe(Type contractType,string operation); public void Unsubscribe(); public void Unsubscribe(Type contractType); public void Unsubscribe(Type contractType,string operation); }
The constructors of ServiceBusEventsHost
all require at least
one base addresses (with a regular host, base addresses are
optional). For each base address provided, ServiceBusEventsHost
will open an endpoint
per contract supported by the service type. The address of that
endpoint will be the base address suffixed by the events contract
type:
[base address]/[events contract name]
The key in implementing ServiceBusEventsHost
is that with the
service bus, when a host listens on a service bus address, it
actually monitors all of its sub-addresses as well. As a result,
ServiceBusEventsHost
will also
receive the publisher’s messages sent to:
[base address]/[events contract name]/[event operation]
ServiceBusEventsHost
uses
interception to rout the messages to the correct operation, all
outside the scope of the service. The service will process only the
events it has subscribed to. Managing the events outside the scope
of the service, per-host, enables integration with your application
administration tools.
ServiceBusEventsHost
derives from ServiceBusHost
(presented in Chapter 11) for automating
security configuration and discovery. ServiceBusEventsHost
will default to a
plain instance of NetEventRelayBinding
with anonymous
message security. You can also provide ServiceBusEventsHost
with the binding to
use via the RelayBinding
property
or the SetBinding()
method. Like
any other host, you need to open and close Service
Bus
Events
Host
. However, to receive calls (the
events), you must use one of the Subscribe()
methods. You can subscribe (or
unsubscribe) to all events on all service contracts supported by the
service type, to all events on a particular service contract, or to
a specific event operation on a particular service contract.
There is no need for a config file when using ServiceBusEventsHost
, and the code in
Example D-10 is reduced
to:
ServiceBusEventsHost host = new ServiceBusEventsHost(typeof(MySubscriber), "sb://MyNamespace.servicebus.windows.net/"); host.Open(); //Subscribing to all events: host.Subscribe(); //Unsubscribe Event2(): host.Unsubscribe(typeof(IMyEvents),"OnEvent2");
Example D-11
shows the partial implementation of ServiceBusEventsHost
, without the error
handling and synchronization code.
Example D-11. Implementing ServiceBusEventsHost (partial)
//Partial listing and without error handling and synchronization public class ServiceBusEventsHost : ServiceBusHost { //Managing the subscriptions Dictionary<string,List<string>> Subscriptions {get;set;} public ServiceBusEventsHost(Type serviceType,Uri[] baseAddresses) : base(serviceType,baseAddresses) { Subscriptions = new Dictionary<string,List<string>>(); foreach(Uri baseAddress in BaseAddresses) { Type[] contracts = GetServiceContracts(Description.ServiceType); foreach(Type contract in contracts) { AddServiceEndpoint(contract,RelayBinding, baseAddress.AbsoluteUri + contract); Subscriptions[contract.Name] = new List<string>(); } } IEndpointBehavior selector = new EventSelector(Subscriptions); foreach(ServiceEndpoint endpoint in Description.Endpoints) { endpoint.Behaviors.Add(selector); } } public void Subscribe(Type contractType,string operation) { if(Subscriptions[contractType.Name].Contains(operation) == false) { Subscriptions[contractType.Name].Add(operation); } } public void Unsubscribe(Type contractType,string operation) { if(Subscriptions[contractType.Name].Contains(operation)) { Subscriptions[contractType.Name].Remove(operation); } } //Uses reflection to get all service contracts static Type[] GetServiceContracts(Type serviceType) {...} class EventSelector : IDispatchOperationSelector,IEndpointBehavior { readonly Dictionary<string,List<string>> m_Subscriptions; public EventSelector(Dictionary<string,List<string>> subscriptions) { m_Subscriptions = subscriptions; } public string SelectOperation(ref Message message) { string[] slashes = message.Headers.Action.Split('/'), string contract = slashes[slashes.Length-2]; string operation = slashes[slashes.Length-1]; if(m_Subscriptions[contract].Contains(operation)) { return operation; } else { return null; } } void IEndpointBehavior.ApplyDispatchBehavior(ServiceEndpoint endpoint, EndpointDispatcher endpointDispatcher) { endpointDispatcher.DispatchRuntime.OperationSelector = this; } ... } }
ServiceBusEventsHost
stores
the subscriptions in the Subscriptions
dictionary, which has a
matching key for every service contract supported by the service
type. For each such contract, ServiceBusEventsHost
manages a linked list
of operations on the contract, representing the subscribed events.
The Subscribe()
and Unsubscribe()
methods merely add or remove
the events from the corresponding operations linked list.
The heart of ServiceBusEventsHost
is its ability to
intercept the messages sent to the endpoint and rout them to the
desired event-handling operation. For such cases, WCF provides an
extensibility hook in the form of the IDispatchOperationSelector
interface,
defined as:
public interface IDispatchOperationSelector { string SelectOperation(ref Message message); }
The implementation of IDispatchOperationSelector
is attached to
the endpoint dispatcher. Every time a method is received, you can
have your implementation of Select
Operation()
arbitrate which operation
on the contract will actually process the message, by returning the
name of the selected operation.
Every endpoint dispatcher is represented by the EndpointDispatcher
, with the property
DispatchRuntime
:
public class EndpointDispatcher { public DispatchRuntime DispatchRuntime {get;} //More members }
DispatchRuntime
has a property called
OperationSelector
of the type
IDispatch
Operation
Selector
:
public sealed class DispatchRuntime { public IDispatchOperationSelector OperationSelector {get;} //More members }
To install an operation selector, use an endpoint behavior,
because the ApplyDispatchBehavior()
method of IEndpointBehavior
is given a reference to
the endpoint’s dispatcher:
public interface IEndpointBehavior
{
void ApplyDispatchBehavior(ServiceEndpoint endpoint,
EndpointDispatcher endpointDispatcher);
//More members
}
ServiceBusEventsHost
has
such an implementation of an endpoint behavior with the private
nested class EventSelector
:
class EventSelector : IDispatchOperationSelector,IEndpointBehavior {...}
EventSelector
supports both
IEndpointBehavior
(so it can
interact with the dispatcher) and IDispatchOperationSelector
(so it can
attach itself to the dispatcher).
The constructors of ServiceBusEventsHost
iterate over the
collection of endpoints in the service description, adding an
instance of EventSelector
for
each endpoint.
ServiceBusEventsHost
passes
a reference to the dictionary containing the subscriptions to the
constructor of EventSelector
, and
EventSelector
stores it as a
class member.
In EventSelector
, the
implementation of ApplyDispatchBehavior()
attaches the
instance to the dispatcher runtime as an operation selector.
The implementation of SelectOperation()
parses the contract and
the operation name out of the incoming message. It then uses them to
see if there is a matching subscription in the dictionary and, if
so, returns the name of the target operation. If no subscription is
found, SelectOperation()
discards
the message.
The publisher can use any proxy over the one-way relay binding
to fire the event, including my OneWayClientBase<T>
presented in
Chapter 11. The publisher must provide the
endpoint address with the base address that ServiceBusEventsHost
expects, suffixed by
the contract name. To tighten this potential loose screw, I wrote
the helper class ServiceBusEventsClientBase
defined in
Example D-12.
Example D-12. The ServiceBusEventsClientBase class
public abstract class ServiceBusEventsClientBase<T> : OneWayClientBase<T> where T : class { public ServiceBusEventsClientBase(string baseAddress) : this(baseAddress,new NetOnewayRelayBinding()) {} public ServiceBusEventsClientBase(string baseAddress, NetOnewayRelayBinding binding) : base(binding,ToEventAddress(namespaceBaseAddress)) {} /* More constructor */ static EndpointAddress ToEventAddress(string baseAddress) { return new EndpointAddress(baseAddress + typeof(T).Name); } }
All ServiceBusEventsClientBase
needs is the
events base address, and there is no need for a config file.
ServiceBusEventsClientBase
will
append the contract name to the events base address.
You can also provide ServiceBusEventsClientBase
with the
binding to use as well as the security parameters offered by the
constructors of OneWayClientBase<T>
.
To use ServiceBusEventsClientBase<T>
, just
derive from it like the regular ClientBase<T>
:
class MyEventsProxy : ServiceBusEventsClientBase<IMyEvents>,IMyEvents { public MyEventsProxy(string baseAddress) : base(baseAddress) {} public void OnEvent1() { Channel.OnEvent1(); } public void OnEvent2(int number) { Channel.OnEvent2(number); } public void OnEvent3(int number,string text) { Channel.OnEvent3(number,text); } }
3.133.114.221