In the service bus, every URI in the service namespace is actually an addressable messaging junction. The client can send a message to that junction, and the junction can relay it to the services. However, each junction can also function as a buffer (see Figure 11-12).
Unlike the buffers used with the one-way relay binding, the messages are stored in the buffer for a configurable period of time, even when no service is monitoring the buffer. Note that multiple services can monitor the buffer, but only one of them will be able to retrieve the message.
The client is decoupled from the services behind the buffer, and the client or the service need not be running at the same time. Since the client interacts with a buffer and not with an actual service endpoint, all the messages are sent one-way, and there is no way (out of the box) to obtain the results of the message invocation or any errors.
The service bus buffers should not be equated with queues (such as MSMQ queues, discussed in Chapter 9), as there are a number of crucial differences:
The service bus buffers are not durable and the messages are stored in memory. This implies the risk of losing messages in the (somewhat unlikely) event of a catastrophic failure of the service bus itself.
The service bus buffers are not transactional and both sending and retrieving messages cannot be part of a transaction.
The buffers cannot handle long-lasting messages. The service must retrieve the message from the buffer within 10 minutes (at the most) or the message is discarded. While the MSMQ-based messages discussed in Chapter 9 also feature a time-to-live, that timeout is much longer, defaulting to one day. This enables a far broader range of truly disjointed operations and disconnected applications.
The buffers are limited in size; they cannot hold more than 50 messages at the most.
The buffered messages are capped in size, presently at 64 KB each. While MSMQ also imposes its own maximum message size, it is substantially larger (4 MB per message).
The result of these differences is that the buffers do not provide true queued calls over the cloud, but rather, they provide for elasticity in the connection, where the calls are somewhere between queued calls and fire-and-forget asynchronous calls.
There are two scenarios in which buffers are useful. The first is an application where the client and the service are interacting over a somewhat shaky connection, and dropping the connection and picking it up again is tolerated as long as the messages are buffered during the short offline period. The second (and more pervasive) scenario is a client issuing asynchronous one-way calls while utilizing a response buffer (described later in the chapter) to handle the result of the call. Figuratively speaking, such interaction is akin to viewing the network connection more as a bungee cord than a rigid network wire that has no storage capacity.
The buffer address must be unique; that is, you can only
have a single buffer associated with that address, and the address
cannot already be in use by a buffer or a service. However, multiple
parties can retrieve messages from the same buffer. In addition, the
buffer address must use either HTTP or HTTPS for the transport scheme.
To send and retrieve messages from the buffer, the service bus offers
an API similar to that of System.
Messaging
, requiring you to interact with raw messages. The
service bus administrator manages the buffers independently of
services or clients. Each buffer must have a policy governing its
behavior and lifetime. The service bus administrator must perform
programmatic calls out of the box to create and manage buffers.
Each buffer policy is expressed via an instance of MessageBufferPolicy
, defined as:
[DataContract] public class MessageBufferPolicy : ... { public MessageBufferPolicy(); public MessageBufferPolicy(MessageBufferPolicy policyToCopy); public DiscoverabilityPolicy Discoverability {get;set;} public TimeSpan ExpiresAfter {get;set;} public int MaxMessageCount {get;set;} public OverflowPolicy OverflowPolicy {get;set;} public AuthorizationPolicy Authorization {get;set;} public TransportProtectionPolicy TransportProtection {get;set;} }
The Discoverability
policy
property is an enum of the type DiscoverabilityPolicy
, controlling whether
or not the buffer is included in the ATOM feed of the service
namespace:
public enum DiscoverabilityPolicy { Managers, ManagersListeners, ManagersListenersSenders, Public }
Discoverability
defaults to
DiscoverabilityPolicy.Managers
,
meaning it requires a managed authorization claim. Setting it to
DiscoverabilityPolicy.Public
publishes it to the feed without any authorization.
The ExpiresAfter
property
controls the lifetime of messages in the buffer. The default is 5
minutes, the minimum value is 1 minute, and the maximum value is 10
minutes. Attempts to configure a longer expiration result in a value
of 10 minutes. Every time a service tries to retrieve a message off
the buffer, it resets the countdown timeout.
The MaxMessageCount
property caps the buffer size. The policy defaults to 10 messages
and the minimum value is, of course, set to 1. As mentioned, the
maximum allowed buffer size is 50 and attempts to configure a larger
size will result in a buffer size of 50 messages.
The OverflowPolicy
property
is an enum with a single value defined as:
public enum OverflowPolicy { RejectIncomingMessage }
OverflowPolicy
controls
what to do with the message once the buffer is maxed out, that is,
full to capacity (defined by MaxMessageCount
). The only possible option
is to reject the message (send it back with an error to the
sender).
The single-value enum serves as a placeholder for future options, such as discarding the message without informing the sender or discarding previous messages from the buffer and accepting the new message.
The final two properties are responsible for security
configuration. The AuthorizationPolicy
property instructs the
service bus whether or not to authorize the client’s token:
public enum AuthorizationPolicy
{
NotRequired,
RequiredToSend,
RequiredToReceive,
Required
}
The default value of AuthorizationPolicy.Required
is to require
authorizing both sending and receiving clients.
Finally, the TransportProtection
property stipulates
the minimum level of transfer security
of the message to the buffer using an enum of the type Transport
Protection
Policy
:
public enum TransportProtectionPolicy
{
None,
AllPaths
}
Transport security via TransportProtectionPolicy.AllPaths
is the
default of all buffer policies, which, in turn, mandates the use of
an HTTPS address.
You use the MessageBufferClient
class to administer your buffer:
public sealed class MessageBufferClient { public Uri MessageBufferUri {get;} public static MessageBufferClient CreateMessageBuffer( TransportClientEndpointBehavior credential, Uri messageBufferUri,MessageBufferPolicy policy); public static MessageBufferClient GetMessageBuffer( TransportClientEndpointBehavior credential, Uri messageBufferUri); public MessageBufferPolicy GetPolicy(); public void DeleteMessageBuffer(); //More members }
You use the static methods of MessageBufferClient
to obtain an
authenticated instance of MessageBufferClient
by providing the
static methods with the service bus credentials (of the type
TransportClientEndpointBehavior
).
I will discuss service bus security (and TransportClientEndpointBehavior
in
particular) at length later in this chapter. When you use MessageBufferClient
, you typically need to
check whether the buffer already exists in the service bus by
calling the GetMessageBuffer()
method. If there is no buffer, GetMessageBuffer()
throws an
exception.
Example 11-4 demonstrates creating a buffer programmatically.
Example 11-4. Creating a buffer programmatically
Uri bufferAddress = new Uri("https://MyNamespace.servicebus.windows.net/MyBuffer/"); TransportClientEndpointBehavior credential = ... MessageBufferPolicy bufferPolicy = new MessageBufferPolicy(); bufferPolicy.MaxMessageCount = 12; bufferPolicy.ExpiresAfter = TimeSpan.FromMinutes(3); bufferPolicy.Discoverability = DiscoverabilityPolicy.Public; MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,bufferPolicy);
The example instantiates a buffer policy object and sets the
policy to some desired values. All it takes to install the buffer is
to call the CreateMessageBuffer()
method of MessageBufferClient
with the policy and some valid credentials.
As an alternative to programmatic calls, you can use my Service Bus Explorer to both view and modify buffers. Figure 11-13 shows how to create a new buffer by specifying its address and various policy properties. You can also delete all buffers in the service namespace.
You can also review and modify the policies of existing buffers, purge messages from the buffer, and even delete a buffer by selecting the buffer in the service namespace tree and interacting with the buffer properties in the right pane, as shown in Figure 11-14.
When creating buffers, it is best to maximize both the buffer size and its expiration to enable more decoupled interaction on the time axis—this allows for a less fleeting window of opportunity for the clients and services to interact. In addition, it is a good idea to make the buffer discoverable so that you can view it on the service bus registry. When it comes to using the buffer, both the client and the service should verify the buffer already exists and, if not, proceed to create it.
To automate these steps, I added a number of methods to
ServiceBusHelper
:
public static partial class ServiceBusHelper { public static void CreateBuffer(string bufferAddress,string secret); public static void CreateBuffer(string bufferAddress,string issuer, string secret); public static void VerifyBuffer(string bufferAddress,string secret); public static void VerifyBuffer(string bufferAddress,string issuer, string secret); public static void PurgeBuffer(Uri bufferAddress, TransportClientEndpointBehavior credential); public static void DeleteBuffer(Uri bufferAddress, TransportClientEndpointBehavior credential); }
The CreateBuffer()
method
creates a new discoverable buffer with a maximized capacity of 50 messages and expiration
of 10 minutes. If the buffer already exists, CreateBuffer()
deletes the old buffer. The VerifyBuffer()
method verifies a buffer
exists and will create a new buffer otherwise. PurgeBuffer()
is useful in purging all
buffered messages during diagnostics or debugging. DeleteBuffer()
simply deletes the
buffer.
Example 11-5 shows partial listing of the implementation of these methods.
Example 11-5. Partial listing of the buffer helper methods
public static partial class ServiceBusHelper { public static void CreateBuffer(string bufferAddress,string issuer, string secret) { TransportClientEndpointBehavior credentials = ...; CreateBuffer(bufferAddress,credentials); } static void CreateBuffer(string bufferAddress, TransportClientEndpointBehavior credentials) { MessageBufferPolicy policy = CreateBufferPolicy(); CreateBuffer(bufferAddress,policy,credentials); } static internal MessageBufferPolicy CreateBufferPolicy() { MessageBufferPolicy policy = new MessageBufferPolicy(); policy.Discoverability = DiscoverabilityPolicy.Public; policy.ExpiresAfter = TimeSpan.FromMinutes(10); policy.MaxMessageCount = 50; return policy; } public static void PurgeBuffer(Uri bufferAddress, TransportClientEndpointBehavior credentials) { Debug.Assert(BufferExists(bufferAddress,credentials)); MessageBufferClient client = MessageBufferClient. GetMessageBuffer(credentials,bufferAddress); MessageBufferPolicy policy = client.GetPolicy(); client.DeleteMessageBuffer(); MessageBufferClient.CreateMessageBuffer(credentials,bufferAddress,policy); } public static void VerifyBuffer(string bufferAddress,string issuer, string secret) { TransportClientEndpointBehavior credentials = ...; VerifyBuffer(bufferAddress,credentials); } internal static void VerifyBuffer(string bufferAddress, TransportClientEndpointBehavior credentials) { if(BufferExists(bufferAddress,credentials)) { return; } CreateBuffer(bufferAddress,credentials); } internal static bool BufferExists(Uri bufferAddress, TransportClientEndpointBehavior credentials) { try { MessageBufferClient client = MessageBufferClient. GetMessageBuffer(credentials,bufferAddress); client.GetPolicy(); return true; } catch(FaultException) {} return false; } static void CreateBuffer(string bufferAddress, MessageBufferPolicy policy,TransportClientEndpointBehavior credentials) { Uri address = new Uri(bufferAddress); if(BufferExists(address,credentials)) { MessageBufferClient client = MessageBufferClient. GetMessageBuffer(credentials,address); client.DeleteMessageBuffer(); } MessageBufferClient.CreateMessageBuffer(credentials,address,policy); } }
The BufferExists()
method
uses the GetPolicy()
method of
MessageBufferClient
to determine
if a buffer exists, and it interprets an error as an indication that
the buffer does not exist. The PurgeBuffer()
method first copies the
buffer policy, then deletes the buffer, and creates a new buffer
(with the same address) with the old policy.
As mentioned already, the service bus buffers require
interactions with raw WCF messages. Do this with the Send()
and Retrieve()
methods of MessageBufferClient
(obtained when creating or
getting a buffer):
public sealed class MessageBufferClient { public void Send(Message message); public void Send(Message message,TimeSpan timeout); public Message Retrieve(); public Message Retrieve(TimeSpan timeout); //More members }
Both methods are subject to a timeout, which defaults to one minute for the parameterless versions. For the sender, the timeout specifies how long to wait if the buffer is full. For the retriever, the timeout specifies how long to wait if the buffer is empty.
Example 11-6 shows the sender-side code.
Example 11-6. Sending raw messages to the buffer
TransportClientEndpointBehavior credential = ...; Uri bufferUri = new Uri("sb://MyNamespace.servicebus.windows.net/MyBuffer/"); MessageBufferClient client = MessageBufferClient.GetMessageBuffer(credential,bufferUri); Message message = Message.CreateMessage(MessageVersion.Default,"Hello"); client.Send(message,TimeSpan.MaxValue);
The sender first creates a credentials object and uses it to
obtain an instance of Message
Buffer
Client
. The sender then creates a WCF
message and sends it to the buffer. Example 11-7 shows the
retrieving-side code.
Example 11-7. Retrieving raw messages from the buffer
TransportClientEndpointBehavior credential = ...; Uri bufferUri = new Uri("sb://MyNamespace.servicebus.windows.net/MyBuffer/"); MessageBufferClient client = MessageBufferClient.GetMessageBuffer(credential,bufferUri); Message message = client.Retrieve(); Debug.Assert(message.Headers.Action == "Hello");
When securely posting or retrieving raw WCF messages to the messaging junctions, you are restricted to using only Transport security.
Using raw WCF messages, as in Example 11-6 and Example 11-7 is what the
service bus has to offer. And yet, such a programming model leaves
much to be desired. It is cumbersome and tedious, and it is not
structured, nor type-safe. It is a throwback to the days before WCF
itself with explicit programming against MSMQ with the API of
System.Messaging
; you
need to parse the message content and switch on its elements.
Fortunately, you can improve on this basic offering. Instead of
interacting with raw messages, you should elevate the interaction to
structured calls between clients and services. While this requires a
considerable degree of low-level advanced work, I was able to
encapsulate it with a small set of helper classes.
To provide for structured buffered calls on the service side,
I wrote BufferedServiceBusHost<T>
, defined
as:
public class BufferedServiceBusHost<T> : ServiceHost<T>,... { public BufferedServiceBusHost(params Uri[] bufferAddresses); public BufferedServiceBusHost(T singleton,params Uri[] bufferAddresses); /* Additional constructors */ }
I modeled BufferedServiceBusHost<T>
after
using WCF with the MSMQ binding. You need to provide its constructor
with the address or addresses of the buffers from which to retrieve
messages. The rest is just as with a regular WCF service
host:
Uri address = new Uri("https://MyNamespace.servicebus.windows.net/MyBuffer");
ServiceHost host = new BufferedServiceBusHost<MyService>(address);
host.Open();
Note that you can provide the constructors with multiple
buffer addresses to monitor, just as a WCF service host can open
multiple endpoints with different queues. There is no need (or a
way) to provide any of these buffer addresses in the service
endpoint section in the config file (although the buffer addresses
can come from the app settings section if you so design). While the
actual communication with the service bus buffer is done with raw WCF messages, that work is
encapsulated. Buffered
Service
Bus
Host<T>
will verify that the
buffers provided actually exist and, if they do not, will create
them using the buffer policy of ServiceBusHelper.VerifyBuffer()
shown in
Example 11-5.
BufferedServiceBusHost<T>
will use the default transfer security of securing all paths.
BufferedServiceBusHost<T>
will verify that all the contracts of the provided service generic
type parameter T
are all one-way,
that is, they all have only one-way operations (just as the one-way
relay binding does). One last feature is when closing the host, in
debug builds only, BufferedServiceBusHost<T>
will purge
all its buffers to ensure a smooth start for the next debug session.
Recall that I provided a similar feature for ServiceHost<T>
with MSMQ-based
queued calls.
BufferedServiceBusHost<T>
operates
by hosting the specified service locally. For each service contract
on the type parameter T
, BufferedServiceBusHost<T>
adds an
endpoint over IPC. The IPC binding to those endpoints is configured to never
time out.
While IPC always has a transport session, to mimic the MSMQ
behavior, even per-session services are treated as per-call
services. Each dequeued WCF message is played to a new instance of
the service, potentially concurrently with previous messages, just
as with the MSMQ binding. If the provided service type is a
singleton, Buffered
Service
Bus
Host<T>
will send all messages
across all buffers and endpoints to the same service instance,
just as with the MSMQ
binding.
To throttle the buffered service, use the throttling extension methods described in Chapter 4.
BufferedServiceBusHost<T>
monitors
each specified buffer on the separate background worker thread. When a message is deposited in
the buffer, Buffered
Service
Bus
Host<T>
retrieves it and
converts the raw WCF message into a call to the appropriate endpoint
over IPC.
Example 11-8
provides a partial listing of BufferedServiceBusHost<T>
, with most
of the error handling and security removed.
Example 11-8. Partial listing of BufferedServiceBusHost<T>
public class BufferedServiceBusHost<T> : ServiceHost<T>,IServiceBusProperties { Uri[] m_BufferAddresses; List<Thread> m_RetrievingThreads; IChannelFactory<IDuplexSessionChannel> m_Factory; Dictionary<string,IDuplexSessionChannel> m_Proxies; const string CloseAction = "BufferedServiceBusHost.CloseThread"; public BufferedServiceBusHost(params Uri[] bufferAddresses) { m_BufferAddresses = bufferAddresses; Binding binding = new NetNamedPipeBinding(); binding.SendTimeout = TimeSpan.MaxValue; Type[] interfaces = typeof(T).GetInterfaces(); foreach(Type interfaceType in interfaces) { VerifyOneway(interfaceType); string address = "net.pipe://localhost/" + Guid.NewGuid(); AddServiceEndpoint(interfaceType,binding,address); } m_Factory = binding.BuildChannelFactory<IDuplexSessionChannel>(); m_Factory.Open(); } protected override void OnOpening() { ConfigureServiceBehavior(); base.OnOpening(); } protected override void OnOpened() { CreateProxies(); CreateListeners(); base.OnOpened(); } protected override void OnClosing() { CloseListeners(); foreach(IDuplexSessionChannel proxy in m_Proxies.Values) { proxy.Close(); } m_Factory.Close(); PurgeBuffers(); base.OnClosing(); } //Verify all operations are one-way static void VerifyOneway(Type interfaceType) {...} void ConfigureServiceBehavior() { ServiceBehaviorAttribute behavior = Description.Behaviors.Find<ServiceBehaviorAttribute>(); if(behavior.InstanceContextMode != InstanceContextMode.Single) { behavior.InstanceContextMode = InstanceContextMode.PerCall; behavior.ConcurrencyMode = ConcurrencyMode.Multiple; foreach(ServiceEndpoint endpoint in Description.Endpoints) { foreach(OperationDescription operation in endpoint.Contract.Operations) { OperationBehaviorAttribute attribute = operation.Behaviors.Find<OperationBehaviorAttribute>(); if(attribute.TransactionScopeRequired == true) { behavior.ReleaseServiceInstanceOnTransactionComplete = false; return; } } } } } void CreateProxies() { m_Proxies = new Dictionary<string,IDuplexSessionChannel>(); foreach(ServiceEndpoint endpoint in Description.Endpoints) { IDuplexSessionChannel channel = m_Factory.CreateChannel(endpoint.Address); channel.Open(); m_Proxies[endpoint.Contract.Name] = channel; } } void CreateListeners() { m_RetrievingThreads = new List<Thread>(); foreach(Uri bufferAddress in m_BufferAddresses) { ServiceBusHelper.VerifyBuffer(bufferAddress.AbsoluteUri,...); Thread thread = new Thread(Dequeue); m_RetrievingThreads.Add(thread); thread.IsBackground = true; thread.Start(bufferAddress); } } void Dequeue(object arg) { Uri bufferAddress = arg as Uri; MessageBufferClient bufferClient = MessageBufferClient.GetMessageBuffer(...,bufferAddress); while(true) { Message message = bufferClient.Retrieve(TimeSpan.MaxValue); if(message.Headers.Action == CloseAction) { return; } else { Dispatch(message); } } } void Dispatch(Message message) { string contract = ExtractContract(message); m_Proxies[contract].Send(message); } static string ExtractContract(Message message) { string[] elements = message.Headers.Action.Split('/'), return elements[elements.Length-2]; } void SendCloseMessages() { foreach(Uri bufferAddress in m_BufferAddresses) { MessageBufferClient bufferClient = MessageBufferClient.GetMessageBuffer(...,bufferAddress); Message message = Message.CreateMessage(MessageVersion.Default,CloseAction); bufferClient.Send(message); } } void CloseListeners() { SendCloseMessages(); foreach(Thread thread in m_RetrievingThreads) { thread.Join(); } } [Conditional("DEBUG")] void PurgeBuffers() { foreach(Uri bufferAddress in m_BufferAddresses) { ServiceBusHelper.PurgeBuffer(bufferAddress,...); } } }
BufferedServiceBusHost<T>
stores the
proxies to the locally-hosted IPC endpoints in a dictionary called
m_Proxies
:
Dictionary<string,IDuplexSessionChannel> m_Proxies;
The key into the dictionary is the endpoints’ contract type name.
The constructors store the provided buffer addresses and use
reflection to obtain a collection of all the service contracts on
the service type. For each service contract, BufferedServiceBusHost<T>
verifies
it has only one-way operations, then calls the base AddServiceEndpoint()
to add an endpoint
for that contract type. The address is an IPC address using a GUID
for the pipe’s name. The constructors use the IPC binding to build a
channel factory of the type IChannelFactory<IDuplexSessionChannel>
.
IChannelFactory<T>
is used
to create a non-strongly-typed channel over the binding:
public interface IChannelFactory<T> : IChannelFactory { T CreateChannel(EndpointAddress to); //More members }
The OnOpening()
method
BufferedServiceBusHost<T>
configures the behavior of the service. It will convert a
per-session service into a per-call service and will configure both
per-call and per-session services to use concurrent calls. This is
required because there is no way to turn off the transport session
with IPC, and with a transport session in place, even with per-call,
the calls will be played out one at a time. With a transactional
per-call service configured with concurrent access, OnOpening()
must avoid releasing the
instance in every transactional operation, as explained in Chapter 8. After opening the internal host
with all its IPC endpoints, the OnOpened()
method
creates the internal proxies to those endpoints and creates the
buffered listeners. These two steps are the heart of BufferedServiceBusHost<T>
. To create
the proxies, it iterates over the collection of endpoints. From each
endpoint, it obtains its address and uses the IChannel
Factory
<IDuplex
Session
Channel>
to create
a channel against that address. That channel (or proxy) is then
stored in the dictionary. The CreateListeners()
method iterates over the
specified buffer addresses. For each address, it verifies the buffer
and creates a worker thread to dequeue its messages.
The Dequeue()
method
uses a MessageBufferClient
to
retrieve the messages in an infinite loop and dispatch them using
the Dispatch()
method. Dispatch()
extracts the target contract
name from the message and uses it to look up the IDuplexChannel
from the proxies dictionary
and send the message over IPC. IDuplexChannel
is supported by the
underlying IPC channel and provides a way to send raw
messages:
public interface IOutputChannel : ... { void Send(Message message,TimeSpan timeout); //More members } public interface IDuplexSessionChannel : IOutputChannel,... {}
If an error occurs during the IPC call, BufferedServiceBusHost<T>
will
recreate the channel it manages against that endpoint (not shown in
Example 11-8). When
you close the host, you need to close the proxies thus gracefully
waiting for the calls in progress to complete. The problem is how to
gracefully close all the retrieving threads, since MessageBufferClient.Retrieve()
is a
blocking operation and there is no built-in way to abort it. The
solution is to post to each monitored buffer a special private
message whose action will signal the retrieving thread to exit. This
is what the SendCloseMessages()
method does. The CloseListeners()
method posts that private message to the buffers and waits for all
the listening threads to terminate by joining them. Closing the
listening threads stops feeding messages to the internal proxies,
and once the proxies are closed (when all current calls in progress
have returned), the host is ready to shut down. BufferedServiceBusHost<T>
also
supports an ungraceful Abort()
method, which just aborts all threads (not shown in Example 11-8).
Finally, note that BufferedServiceBusHost<T>
supports
the IServiceBusProperties
interface, which I defined as:
public interface IServiceBusProperties { TransportClientEndpointBehavior Credential {get;set;} Uri[] Addresses {get;} }
In the interest of decoupling, I needed such an interface in a few places, especially in streamlining buffering.
For completeness sake, all my nonstatic service bus helper classes support this simple interface, so you can use it for your needs.
For the client, I wrote the class BufferedServiceBusClient<T>
, defined
as:
public abstract class BufferedServiceBusClient<T> : HeaderClientBase<T,ResponseContext>,IServiceBusProperties { //Buffer address from config public BufferedServiceBusClient() {} //No need for config file public BufferedServiceBusClient(Uri bufferAddress); /* Additional constructors with different credentials */ protected virtual void Enqueue(Action action); }
BufferedServiceBusClient<T>
derives
from my HeaderClientBase<T,H>
(mentioned in
previous chapters and detailed in Appendix B). The purpose of that base class
is to support a response service, as discussed in the following
section. For a plain client of a buffered service, that derivation
is immaterial.
You can use BufferedServiceBusClient<T>
with or
without a client config file. The constructors that accept the
buffer address do not require a config file. The parameterless
constructor or the constructors that accept the endpoint name expect
the config file to contain an endpoint matching the contract type
with the one-way relay binding (although BufferedServiceBusClient<T>
completely ignores that binding).
When deriving your proxy from BufferedServiceBusClient<T>
, you
will need to use the protected Enqueue()
method instead of using the
Channel
property directly:
[ServiceContract]
interface IMyContract
{
[OperationContract(IsOneWay = true)]
void MyMethod(int number);
}
class MyContractClient : BufferedServiceBusClient<IMyContract>,IMyContract
{
public void MyMethod(int number)
{
Enqueue(()=>Channel.MyMethod(number));
}
}
Enqueue()
accepts a
delegate (or a Lambda expression) that wraps the use of the Channel
property. The
result is still type-safe. Example 11-9 shows a
partial listing of the BufferedServiceBusClient<T>
class.
Example 11-9. Partial listing of BufferedServiceBusClient<T>
public abstract class BufferedServiceBusClient<T> :
HeaderClientBase<T,ResponseContext>,
IServiceBusProperties
where T : class
{
MessageBufferClient m_BufferClient;
public BufferedServiceBusClient(Uri bufferAddress) :
base(new NetOnewayRelayBinding(),
new EndpointAddress(bufferAddress))
{}
protected virtual void Enqueue(Action action)
{
try
{
action();
}
catch(InvalidOperationException exception)
{
Debug.Assert(exception.Message == "This message cannot support the
operation because it has been written.");
}
}
protected override T CreateChannel()
{
ServiceBusHelper.VerifyBuffer(Endpoint.Address.Uri.AbsoluteUri,Credential);
m_BufferClient =
MessageBufferClient.GetMessageBuffer(Credential,
Endpoint.Address.Uri.AbsoluteUri);
return base.CreateChannel();
}
protected override void PreInvoke(ref Message request)
{
base.PreInvoke(ref request);
m_BufferClient.Send(request);
}
protected TransportClientEndpointBehavior Credential
{
get
{...}
set
{...}
}
}
The constructors of BufferedServiceBusClient<T>
supply
its base constructor with the buffer address and the binding (always
a one-way relay binding to enforce the one-way operations
validation). The CreateChannel()
method verifies the target buffer exists and obtains a MessageBufferClient
representing it. The
heart of Buffered
Service
Bus
Client<T>
is the PreInvoke()
method.
PreInvoke()
is a virtual method
provided by InterceptorClientBase<T>
, the base
class of HeaderClientBase<T,H>
:
public abstract class InterceptorClientBase<T> : ClientBase<T> where T : class { protected virtual void PreInvoke(ref Message request); //Rest of the implementation } public abstract class HeaderClientBase<T,H> : InterceptorClientBase<T> where T : class {...}
InterceptorClientBase<T>
is
described in Appendix E. Briefly, it is
part of a generic interception framework I implemented that allows
you to easily process the WCF messages before and after the client
dispatches them. BufferedServiceBusClient<T>
overrides PreInvoke()
and uses
the buffer client to send the message to the buffer. This way,
the client maintains a structured
programming model and Buffered
Service
Bus
Client<T>
encapsulates the
interaction with the WCF message. The downside is that the message
can only be sent once, and when the root class of ClientBase
tries to send it, it throws an
InvalidOperationException
. This is where
Enqueue()
comes in handy by
snuffing out that exception.
As discussed in Chapter 9, the only way to receive the result (or errors) of a queued call is to use a queued response service. The same design pattern holds true when dealing with buffers. The client needs to provide a dedicated response buffer for the service to buffer the response to. The client also needs to pass the response address and the method ID in the message headers, just as with the MSMQ-based calls, and you can use many of the supporting types from Chapter 9. The main difference between the MSMQ-based response service and the service bus is that the response buffer must also reside in the service bus, as shown in Figure 11-15.
To streamline the client side, I wrote the class ClientBufferResponseBase<T>
, defined
as:
public abstract class ClientBufferResponseBase<T> : BufferedServiceBusClient<T> where T : class { public readonly Uri ResponseAddress; public ClientBufferResponseBase(Uri responseAddress); /* Additional constructors with different credentials and parameters */ protected virtual string GenerateMethodId(); }
ClientBufferResponseBase<T>
is a
specialized subclass of Buffered
Service
Bus
Client<T>
and it adds the
response context to the message headers (this is why I made BufferedServiceBusClient<T>
derive
from HeaderClientBase<T,H>
and not merely from InterceptorClientBase<T>
). You will
need to derive your specific proxies from ClientBufferResponseBase<T>
and use
the Enqueue()
method, for
example:
[ServiceContract] interface ICalculator { [OperationContract(IsOneWay = true)] void Add(int number1,int number2); } class CalculatorClient : ClientBufferResponseBase<ICalculator>,ICalculator { public CalculatorClient(Uri responseAddress) : base(responseAddress) {} public void Add(int number1,int number2) { Enqueue(()=>Channel.Add(number1,number2)); } }
Using the subclass of ClientBufferResponseBase<T>
is
straightforward:
Uri resposeAddress = new Uri("sb://MyNamespace.servicebus.windows.net/MyResponseBuffer/"); CalculatorClient proxy = new CalculatorClient(responseAddress); proxy.Add(2,3); proxy.Close();
As with ClientResponseBase<T>
, something
that is very handy when managing the responses on the client side to
have the invoking client obtain the method ID used to dispatch the
call. You can do this easily with the Header
property:
CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
string methodId = proxy.Header.MethodId;
proxy.Close();
Example 11-10
lists the implementation of ClientBufferResponseBase<T>
.
Example 11-10. Implementing ClientBufferResponseBase <T>
public abstract class ClientBufferResponseBase<T> : BufferedServiceBusClient<T> where T : class { protected readonly Uri ResponseAddress; public ClientBufferResponseBase(Uri responseAddress) { ResponseAddress = responseAddress; } /* More Constructors */ protected override void PreInvoke(ref Message request) { string methodId = GenerateMethodId(); Header = new ResponseContext(ResponseAddress.AbsoluteUri,methodId); base.PreInvoke(ref request); } protected virtual string GenerateMethodId() { return Guid.NewGuid().ToString(); } //Rest of the implementation }
ClientBufferResponseBase<T>
overrides the PreInvoke()
method
of HeaderClientBase<T,H>
so
that it can generate a new method ID for each call and set it into
the headers, just like ClientResponseBase<T>
of Example 9-23.
Mimicking again the concepts and solutions of the MSMQ-based
calls, to streamline the work required by the buffered service to
call the response service, I wrote the class ServiceBufferResponseBase<T>
, shown
in Example 11-11.
Example 11-11. The ServiceBufferResponseBase<T> class
public abstract class ServiceBufferResponseBase<T> : BufferedServiceBusClient<T> where T : class { public ServiceBufferResponseBase() : base(new Uri(ResponseContext.Current.ResponseAddress)) { Header = ResponseContext.Current; //Grab the credentials the host was using IServiceBusProperties properties = OperationContext.Current.Host as IServiceBusProperties; Credential = properties.Credential; } }
While the service can use a plain BufferedServiceBusClient<T>
to
enqueue the response, you will need to extract the response buffer
address out of the headers and somehow obtain the credentials to log
into the service bus buffer. You will also need to provide the
headers of the outgoing call with the response context. You can
streamline all of these steps with ServiceBufferResponseBase<T>
.
ServiceBufferResponseBase<T>
provides its base constructor with the address out of the response
context, and it also sets that context into the outgoing headers.
Another simplifying assumption ServiceBufferResponseBase<T>
makes
is that the responding service can use the same credentials its host
used (to retrieve messages from its own buffer) to send messages to
the response buffer. To that end, ServiceBufferResponseBase<T>
obtains
a reference to its own host from the operation context and reads the
credentials using the IServiceBusProperties
implementation of
the host. ServiceBufferResponseBase<T>
copies
those credentials for its own use (done inside BufferedServiceBusClient<T>
). This,
of course, mandates the use of BufferedServiceBusHost<T>
to host
the service in the first place.
Your service needs to derive a proxy class from ServiceBufferResponseBase<T>
and use
it to respond. For example, given this response contract:
[ServiceContract] interface ICalculatorResponse { [OperationContract(IsOneWay = true)] void OnAddCompleted(int result,ExceptionDetail error); }
The definition of the proxy to the response service will be:
class CalculatorResponseClient : ServiceBufferResponseBase<ICalculatorResponse>,ICalculatorResponse { public void OnAddCompleted(int result,ExceptionDetail error) { Enqueue(()=>Channel.OnAddCompleted(result,error)); } }
Example 11-12 shows a simple buffered service responding to its client.
Example 11-12. Using ServiceBufferResponseBase<T>
class MyCalculator : ICalculator { [OperationBehavior(TransactionScopeRequired = true)] public void Add(int number1,int number2) { int result = 0; ExceptionDetail error = null; try { result = number1 + number2; } //Don't rethrow catch(Exception exception) { error = new ExceptionDetail(exception); } finally { CalculatorResponseClient proxy = new CalculatorResponseClient(); proxy.OnAddCompleted(result,error); proxy.Close(); } } }
Compare Example 11-12 to Example 9-25—they are virtually identical.
All the response service needs is to access the method ID from the message headers as shown in Example 11-13.
Example 11-13. Implementing a response service
class MyCalculatorResponse : ICalculatorResponse
{
[OperationBehavior(TransactionScopeRequired = true)]
public void OnAddCompleted(int result,ExceptionDetail error)
{
string methodId = ResponseContext.Current.MethodId
;
...
}
}
Example 11-13 is identical to Example 9-27.
3.144.17.91