Service Bus Buffers

In the service bus, every URI in the service namespace is actually an addressable messaging junction. The client can send a message to that junction, and the junction can relay it to the services. However, each junction can also function as a buffer (see Figure 11-12).

Buffers in the service bus

Figure 11-12. Buffers in the service bus

Unlike the buffers used with the one-way relay binding, the messages are stored in the buffer for a configurable period of time, even when no service is monitoring the buffer. Note that multiple services can monitor the buffer, but only one of them will be able to retrieve the message.

The client is decoupled from the services behind the buffer, and the client or the service need not be running at the same time. Since the client interacts with a buffer and not with an actual service endpoint, all the messages are sent one-way, and there is no way (out of the box) to obtain the results of the message invocation or any errors.

Buffers Versus Queues

The service bus buffers should not be equated with queues (such as MSMQ queues, discussed in Chapter 9), as there are a number of crucial differences:

  • The service bus buffers are not durable and the messages are stored in memory. This implies the risk of losing messages in the (somewhat unlikely) event of a catastrophic failure of the service bus itself.

  • The service bus buffers are not transactional and both sending and retrieving messages cannot be part of a transaction.

  • The buffers cannot handle long-lasting messages. The service must retrieve the message from the buffer within 10 minutes (at the most) or the message is discarded. While the MSMQ-based messages discussed in Chapter 9 also feature a time-to-live, that timeout is much longer, defaulting to one day. This enables a far broader range of truly disjointed operations and disconnected applications.

  • The buffers are limited in size; they cannot hold more than 50 messages at the most.

  • The buffered messages are capped in size, presently at 64 KB each. While MSMQ also imposes its own maximum message size, it is substantially larger (4 MB per message).

The result of these differences is that the buffers do not provide true queued calls over the cloud, but rather, they provide for elasticity in the connection, where the calls are somewhere between queued calls and fire-and-forget asynchronous calls.

There are two scenarios in which buffers are useful. The first is an application where the client and the service are interacting over a somewhat shaky connection, and dropping the connection and picking it up again is tolerated as long as the messages are buffered during the short offline period. The second (and more pervasive) scenario is a client issuing asynchronous one-way calls while utilizing a response buffer (described later in the chapter) to handle the result of the call. Figuratively speaking, such interaction is akin to viewing the network connection more as a bungee cord than a rigid network wire that has no storage capacity.

Working with Buffers

The buffer address must be unique; that is, you can only have a single buffer associated with that address, and the address cannot already be in use by a buffer or a service. However, multiple parties can retrieve messages from the same buffer. In addition, the buffer address must use either HTTP or HTTPS for the transport scheme. To send and retrieve messages from the buffer, the service bus offers an API similar to that of System.Messaging, requiring you to interact with raw messages. The service bus administrator manages the buffers independently of services or clients. Each buffer must have a policy governing its behavior and lifetime. The service bus administrator must perform programmatic calls out of the box to create and manage buffers.

The buffer policy

Each buffer policy is expressed via an instance of MessageBufferPolicy, defined as:

[DataContract]
public class MessageBufferPolicy : ...
{
   public MessageBufferPolicy();
   public MessageBufferPolicy(MessageBufferPolicy policyToCopy);

   public DiscoverabilityPolicy Discoverability
   {get;set;}

   public TimeSpan ExpiresAfter
   {get;set;}

   public int MaxMessageCount

   {get;set;}

   public OverflowPolicy OverflowPolicy
   {get;set;}

   public AuthorizationPolicy Authorization
   {get;set;}

   public TransportProtectionPolicy TransportProtection
   {get;set;}
}

The Discoverability policy property is an enum of the type DiscoverabilityPolicy, controlling whether or not the buffer is included in the ATOM feed of the service namespace:

public enum DiscoverabilityPolicy
{
   Managers,
   ManagersListeners,
   ManagersListenersSenders,
   Public
}

Discoverability defaults to DiscoverabilityPolicy.Managers, meaning it requires a managed authorization claim. Setting it to DiscoverabilityPolicy.Public publishes it to the feed without any authorization.

The ExpiresAfter property controls the lifetime of messages in the buffer. The default is 5 minutes, the minimum value is 1 minute, and the maximum value is 10 minutes. Attempts to configure a longer expiration result in a value of 10 minutes. Every time a service tries to retrieve a message off the buffer, it resets the countdown timeout.

The MaxMessageCount property caps the buffer size. The policy defaults to 10 messages and the minimum value is, of course, set to 1. As mentioned, the maximum allowed buffer size is 50 and attempts to configure a larger size will result in a buffer size of 50 messages.

The OverflowPolicy property is an enum with a single value defined as:

public enum OverflowPolicy
{
   RejectIncomingMessage
}

OverflowPolicy controls what to do with the message once the buffer is maxed out, that is, full to capacity (defined by MaxMessageCount). The only possible option is to reject the message (send it back with an error to the sender).

The single-value enum serves as a placeholder for future options, such as discarding the message without informing the sender or discarding previous messages from the buffer and accepting the new message.

The final two properties are responsible for security configuration. The AuthorizationPolicy property instructs the service bus whether or not to authorize the client’s token:

public enum AuthorizationPolicy
{
   NotRequired,
   RequiredToSend,
   RequiredToReceive,
   Required
}

The default value of AuthorizationPolicy.Required is to require authorizing both sending and receiving clients.

Finally, the TransportProtection property stipulates the minimum level of transfer security of the message to the buffer using an enum of the type TransportProtectionPolicy:

public enum TransportProtectionPolicy
{
   None,
   AllPaths
}

Transport security via TransportProtectionPolicy.AllPaths is the default of all buffer policies, which, in turn, mandates the use of an HTTPS address.

Administering the buffer

You use the MessageBufferClient class to administer your buffer:

public sealed class MessageBufferClient
{
   public Uri MessageBufferUri
   {get;}

   public static MessageBufferClient CreateMessageBuffer(
                                       TransportClientEndpointBehavior credential,
                                 Uri messageBufferUri,MessageBufferPolicy policy);

   public static MessageBufferClient GetMessageBuffer(
                                       TransportClientEndpointBehavior credential,
                                                            Uri messageBufferUri);
   public MessageBufferPolicy GetPolicy();
   public void DeleteMessageBuffer();

   //More members
}

You use the static methods of MessageBufferClient to obtain an authenticated instance of MessageBufferClient by providing the static methods with the service bus credentials (of the type TransportClientEndpointBehavior). I will discuss service bus security (and TransportClientEndpointBehavior in particular) at length later in this chapter. When you use MessageBufferClient, you typically need to check whether the buffer already exists in the service bus by calling the GetMessageBuffer() method. If there is no buffer, GetMessageBuffer() throws an exception.

Example 11-4 demonstrates creating a buffer programmatically.

Example 11-4. Creating a buffer programmatically

Uri bufferAddress =
                  new Uri("https://MyNamespace.servicebus.windows.net/MyBuffer/");

TransportClientEndpointBehavior credential = ...

MessageBufferPolicy bufferPolicy = new MessageBufferPolicy();

bufferPolicy.MaxMessageCount = 12;
bufferPolicy.ExpiresAfter = TimeSpan.FromMinutes(3);
bufferPolicy.Discoverability = DiscoverabilityPolicy.Public;

MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,bufferPolicy);

The example instantiates a buffer policy object and sets the policy to some desired values. All it takes to install the buffer is to call the CreateMessageBuffer() method of MessageBufferClient with the policy and some valid credentials.

As an alternative to programmatic calls, you can use my Service Bus Explorer to both view and modify buffers. Figure 11-13 shows how to create a new buffer by specifying its address and various policy properties. You can also delete all buffers in the service namespace.

Creating a buffer using the Service Bus Explorer

Figure 11-13. Creating a buffer using the Service Bus Explorer

You can also review and modify the policies of existing buffers, purge messages from the buffer, and even delete a buffer by selecting the buffer in the service namespace tree and interacting with the buffer properties in the right pane, as shown in Figure 11-14.

A buffer in the Service Bus Explorer

Figure 11-14. A buffer in the Service Bus Explorer

Streamlining administration

When creating buffers, it is best to maximize both the buffer size and its expiration to enable more decoupled interaction on the time axis—this allows for a less fleeting window of opportunity for the clients and services to interact. In addition, it is a good idea to make the buffer discoverable so that you can view it on the service bus registry. When it comes to using the buffer, both the client and the service should verify the buffer already exists and, if not, proceed to create it.

To automate these steps, I added a number of methods to ServiceBusHelper:

public static partial class ServiceBusHelper
{
   public static void CreateBuffer(string bufferAddress,string secret);
   public static void CreateBuffer(string bufferAddress,string issuer,
                                                                   string secret);

   public static void VerifyBuffer(string bufferAddress,string secret);
   public static void VerifyBuffer(string bufferAddress,string issuer,
                                                                   string secret);
   public static void PurgeBuffer(Uri bufferAddress,
                                  TransportClientEndpointBehavior credential);
   public static void DeleteBuffer(Uri bufferAddress,
                                   TransportClientEndpointBehavior credential);
}

The CreateBuffer() method creates a new discoverable buffer with a maximized capacity of 50 messages and expiration of 10 minutes. If the buffer already exists, CreateBuffer() deletes the old buffer. The VerifyBuffer() method verifies a buffer exists and will create a new buffer otherwise. PurgeBuffer() is useful in purging all buffered messages during diagnostics or debugging. DeleteBuffer() simply deletes the buffer.

Example 11-5 shows partial listing of the implementation of these methods.

Example 11-5. Partial listing of the buffer helper methods

public static partial class ServiceBusHelper
{
   public static void CreateBuffer(string bufferAddress,string issuer,
                                                                    string secret)
   {
      TransportClientEndpointBehavior credentials = ...;
      CreateBuffer(bufferAddress,credentials);
   }
   static void CreateBuffer(string bufferAddress,
                            TransportClientEndpointBehavior credentials)
   {
      MessageBufferPolicy policy = CreateBufferPolicy();
      CreateBuffer(bufferAddress,policy,credentials);
   }
   static internal MessageBufferPolicy CreateBufferPolicy()
   {
      MessageBufferPolicy policy = new MessageBufferPolicy();
      policy.Discoverability = DiscoverabilityPolicy.Public;
      policy.ExpiresAfter = TimeSpan.FromMinutes(10);
      policy.MaxMessageCount = 50;

      return policy;
   }
   public static void PurgeBuffer(Uri bufferAddress,
                                  TransportClientEndpointBehavior credentials)
   {
      Debug.Assert(BufferExists(bufferAddress,credentials));
      MessageBufferClient client = MessageBufferClient.
                                      GetMessageBuffer(credentials,bufferAddress);
      MessageBufferPolicy policy = client.GetPolicy();
      client.DeleteMessageBuffer();
      MessageBufferClient.CreateMessageBuffer(credentials,bufferAddress,policy);
   }
   public static void VerifyBuffer(string bufferAddress,string issuer,
                                                                    string secret)
   {
      TransportClientEndpointBehavior credentials = ...;
      VerifyBuffer(bufferAddress,credentials);
   }
   internal static void VerifyBuffer(string bufferAddress,
                                     TransportClientEndpointBehavior credentials)
   {
      if(BufferExists(bufferAddress,credentials))
      {
         return;
      }
      CreateBuffer(bufferAddress,credentials);
   }
   internal static bool BufferExists(Uri bufferAddress,
                                     TransportClientEndpointBehavior credentials)
   {
      try
      {
         MessageBufferClient client = MessageBufferClient.
                                      GetMessageBuffer(credentials,bufferAddress);
         client.GetPolicy();
         return true;
      }
      catch(FaultException)
      {}

      return false;
   }
   static void CreateBuffer(string bufferAddress,
           MessageBufferPolicy policy,TransportClientEndpointBehavior credentials)
   {
      Uri address = new Uri(bufferAddress);
      if(BufferExists(address,credentials))
      {
         MessageBufferClient client = MessageBufferClient.
                                     GetMessageBuffer(credentials,address);
         client.DeleteMessageBuffer();
      }
      MessageBufferClient.CreateMessageBuffer(credentials,address,policy);
   }

}

The BufferExists() method uses the GetPolicy() method of MessageBufferClient to determine if a buffer exists, and it interprets an error as an indication that the buffer does not exist. The PurgeBuffer() method first copies the buffer policy, then deletes the buffer, and creates a new buffer (with the same address) with the old policy.

Sending and Retrieving Messages

As mentioned already, the service bus buffers require interactions with raw WCF messages. Do this with the Send() and Retrieve() methods of MessageBufferClient (obtained when creating or getting a buffer):

public sealed class MessageBufferClient
{
   public void Send(Message message);
   public void Send(Message message,TimeSpan timeout);

   public Message Retrieve();
   public Message Retrieve(TimeSpan timeout);

   //More members
}

Both methods are subject to a timeout, which defaults to one minute for the parameterless versions. For the sender, the timeout specifies how long to wait if the buffer is full. For the retriever, the timeout specifies how long to wait if the buffer is empty.

Example 11-6 shows the sender-side code.

Example 11-6. Sending raw messages to the buffer

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri("sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client =
                       MessageBufferClient.GetMessageBuffer(credential,bufferUri);

Message message = Message.CreateMessage(MessageVersion.Default,"Hello");

client.Send(message,TimeSpan.MaxValue);

The sender first creates a credentials object and uses it to obtain an instance of MessageBufferClient. The sender then creates a WCF message and sends it to the buffer. Example 11-7 shows the retrieving-side code.

Example 11-7. Retrieving raw messages from the buffer

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri("sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client =
                       MessageBufferClient.GetMessageBuffer(credential,bufferUri);
Message message = client.Retrieve();

Debug.Assert(message.Headers.Action == "Hello");

Note

When securely posting or retrieving raw WCF messages to the messaging junctions, you are restricted to using only Transport security.

Buffered Services

Using raw WCF messages, as in Example 11-6 and Example 11-7 is what the service bus has to offer. And yet, such a programming model leaves much to be desired. It is cumbersome and tedious, and it is not structured, nor type-safe. It is a throwback to the days before WCF itself with explicit programming against MSMQ with the API of System.Messaging; you need to parse the message content and switch on its elements. Fortunately, you can improve on this basic offering. Instead of interacting with raw messages, you should elevate the interaction to structured calls between clients and services. While this requires a considerable degree of low-level advanced work, I was able to encapsulate it with a small set of helper classes.

Buffered service host

To provide for structured buffered calls on the service side, I wrote BufferedServiceBusHost<T>, defined as:

public class BufferedServiceBusHost<T> : ServiceHost<T>,...
{
   public BufferedServiceBusHost(params Uri[] bufferAddresses);
   public BufferedServiceBusHost(T singleton,params Uri[] bufferAddresses);

   /* Additional constructors */
}

I modeled BufferedServiceBusHost<T> after using WCF with the MSMQ binding. You need to provide its constructor with the address or addresses of the buffers from which to retrieve messages. The rest is just as with a regular WCF service host:

Uri address = new Uri("https://MyNamespace.servicebus.windows.net/MyBuffer");
ServiceHost host = new BufferedServiceBusHost<MyService>(address);
host.Open();

Note that you can provide the constructors with multiple buffer addresses to monitor, just as a WCF service host can open multiple endpoints with different queues. There is no need (or a way) to provide any of these buffer addresses in the service endpoint section in the config file (although the buffer addresses can come from the app settings section if you so design). While the actual communication with the service bus buffer is done with raw WCF messages, that work is encapsulated. BufferedServiceBusHost<T> will verify that the buffers provided actually exist and, if they do not, will create them using the buffer policy of ServiceBusHelper.VerifyBuffer() shown in Example 11-5. BufferedServiceBusHost<T> will use the default transfer security of securing all paths. BufferedServiceBusHost<T> will verify that all the contracts of the provided service generic type parameter T are all one-way, that is, they all have only one-way operations (just as the one-way relay binding does). One last feature is when closing the host, in debug builds only, BufferedServiceBusHost<T> will purge all its buffers to ensure a smooth start for the next debug session. Recall that I provided a similar feature for ServiceHost<T> with MSMQ-based queued calls.

BufferedServiceBusHost<T> operates by hosting the specified service locally. For each service contract on the type parameter T, BufferedServiceBusHost<T> adds an endpoint over IPC. The IPC binding to those endpoints is configured to never time out.

While IPC always has a transport session, to mimic the MSMQ behavior, even per-session services are treated as per-call services. Each dequeued WCF message is played to a new instance of the service, potentially concurrently with previous messages, just as with the MSMQ binding. If the provided service type is a singleton, BufferedServiceBusHost<T> will send all messages across all buffers and endpoints to the same service instance, just as with the MSMQ binding.

Note

To throttle the buffered service, use the throttling extension methods described in Chapter 4.

BufferedServiceBusHost<T> monitors each specified buffer on the separate background worker thread. When a message is deposited in the buffer, BufferedServiceBusHost<T> retrieves it and converts the raw WCF message into a call to the appropriate endpoint over IPC.

Example 11-8 provides a partial listing of BufferedServiceBusHost<T>, with most of the error handling and security removed.

Example 11-8. Partial listing of BufferedServiceBusHost<T>

public class BufferedServiceBusHost<T> : ServiceHost<T>,IServiceBusProperties
{
   Uri[] m_BufferAddresses;
   List<Thread> m_RetrievingThreads;
   IChannelFactory<IDuplexSessionChannel> m_Factory;
   Dictionary<string,IDuplexSessionChannel> m_Proxies;

   const string CloseAction = "BufferedServiceBusHost.CloseThread";

   public BufferedServiceBusHost(params Uri[] bufferAddresses)
   {
      m_BufferAddresses = bufferAddresses;
      Binding binding = new NetNamedPipeBinding();
      binding.SendTimeout = TimeSpan.MaxValue;

      Type[] interfaces = typeof(T).GetInterfaces();

      foreach(Type interfaceType in interfaces)
      {
         VerifyOneway(interfaceType);

         string address = "net.pipe://localhost/" + Guid.NewGuid();
         AddServiceEndpoint(interfaceType,binding,address);
      }
      m_Factory = binding.BuildChannelFactory<IDuplexSessionChannel>();
      m_Factory.Open();
   }
   protected override void OnOpening()
   {
      ConfigureServiceBehavior();
      base.OnOpening();
   }
   protected override void OnOpened()
   {
      CreateProxies();
      CreateListeners();
      base.OnOpened();
   }
   protected override void OnClosing()
   {
      CloseListeners();

      foreach(IDuplexSessionChannel proxy in m_Proxies.Values)
      {
         proxy.Close();
      }

      m_Factory.Close();

      PurgeBuffers();

      base.OnClosing();
   }

   //Verify all operations are one-way
   static void VerifyOneway(Type interfaceType)
   {...}

   void ConfigureServiceBehavior()
   {
      ServiceBehaviorAttribute behavior =
                           Description.Behaviors.Find<ServiceBehaviorAttribute>();
      if(behavior.InstanceContextMode != InstanceContextMode.Single)
      {
         behavior.InstanceContextMode = InstanceContextMode.PerCall;
         behavior.ConcurrencyMode = ConcurrencyMode.Multiple;

         foreach(ServiceEndpoint endpoint in Description.Endpoints)
         {
            foreach(OperationDescription operation in endpoint.Contract.Operations)
            {
               OperationBehaviorAttribute attribute =
                           operation.Behaviors.Find<OperationBehaviorAttribute>();
               if(attribute.TransactionScopeRequired == true)
               {
                  behavior.ReleaseServiceInstanceOnTransactionComplete = false;
                  return;
               }
            }
         }
      }
   }
   void CreateProxies()
   {
      m_Proxies = new Dictionary<string,IDuplexSessionChannel>();

      foreach(ServiceEndpoint endpoint in Description.Endpoints)
      {
         IDuplexSessionChannel channel = m_Factory.CreateChannel(endpoint.Address);
         channel.Open();
         m_Proxies[endpoint.Contract.Name] = channel;
      }
   }

   void CreateListeners()
   {
      m_RetrievingThreads = new List<Thread>();

      foreach(Uri bufferAddress in m_BufferAddresses)
      {
         ServiceBusHelper.VerifyBuffer(bufferAddress.AbsoluteUri,...);

         Thread thread = new Thread(Dequeue);

         m_RetrievingThreads.Add(thread);
         thread.IsBackground = true;
         thread.Start(bufferAddress);
      }
   }

   void Dequeue(object arg)
   {
      Uri bufferAddress = arg as Uri;

      MessageBufferClient bufferClient =
                          MessageBufferClient.GetMessageBuffer(...,bufferAddress);
      while(true)
      {
         Message message = bufferClient.Retrieve(TimeSpan.MaxValue);
         if(message.Headers.Action == CloseAction)
         {
            return;
         }
         else
         {
            Dispatch(message);
         }
      }
   }

   void Dispatch(Message message)
   {
      string contract = ExtractContract(message);
      m_Proxies[contract].Send(message);
   }

   static string ExtractContract(Message message)
   {
      string[] elements = message.Headers.Action.Split('/'),
      return elements[elements.Length-2];
   }

   void SendCloseMessages()
   {
      foreach(Uri bufferAddress in m_BufferAddresses)
      {
         MessageBufferClient bufferClient =
                          MessageBufferClient.GetMessageBuffer(...,bufferAddress);
         Message message =
                        Message.CreateMessage(MessageVersion.Default,CloseAction);
         bufferClient.Send(message);
      }
   }

   void CloseListeners()
   {
      SendCloseMessages();

      foreach(Thread thread in m_RetrievingThreads)
      {
         thread.Join();
      }
   }

   [Conditional("DEBUG")]
   void PurgeBuffers()
   {
      foreach(Uri bufferAddress in m_BufferAddresses)
      {
         ServiceBusHelper.PurgeBuffer(bufferAddress,...);
      }
   }
}

BufferedServiceBusHost<T> stores the proxies to the locally-hosted IPC endpoints in a dictionary called m_Proxies:

Dictionary<string,IDuplexSessionChannel> m_Proxies;

The key into the dictionary is the endpoints’ contract type name.

The constructors store the provided buffer addresses and use reflection to obtain a collection of all the service contracts on the service type. For each service contract, BufferedServiceBusHost<T> verifies it has only one-way operations, then calls the base AddServiceEndpoint() to add an endpoint for that contract type. The address is an IPC address using a GUID for the pipe’s name. The constructors use the IPC binding to build a channel factory of the type IChannelFactory<IDuplexSessionChannel>. IChannelFactory<T> is used to create a non-strongly-typed channel over the binding:

public interface IChannelFactory<T> : IChannelFactory
{
   T CreateChannel(EndpointAddress to);
   //More members
}

The OnOpening() method BufferedServiceBusHost<T> configures the behavior of the service. It will convert a per-session service into a per-call service and will configure both per-call and per-session services to use concurrent calls. This is required because there is no way to turn off the transport session with IPC, and with a transport session in place, even with per-call, the calls will be played out one at a time. With a transactional per-call service configured with concurrent access, OnOpening() must avoid releasing the instance in every transactional operation, as explained in Chapter 8. After opening the internal host with all its IPC endpoints, the OnOpened() method creates the internal proxies to those endpoints and creates the buffered listeners. These two steps are the heart of BufferedServiceBusHost<T>. To create the proxies, it iterates over the collection of endpoints. From each endpoint, it obtains its address and uses the IChannelFactory<IDuplexSessionChannel> to create a channel against that address. That channel (or proxy) is then stored in the dictionary. The CreateListeners() method iterates over the specified buffer addresses. For each address, it verifies the buffer and creates a worker thread to dequeue its messages.

The Dequeue() method uses a MessageBufferClient to retrieve the messages in an infinite loop and dispatch them using the Dispatch() method. Dispatch() extracts the target contract name from the message and uses it to look up the IDuplexChannel from the proxies dictionary and send the message over IPC. IDuplexChannel is supported by the underlying IPC channel and provides a way to send raw messages:

public interface IOutputChannel : ...
{
   void Send(Message message,TimeSpan timeout);
   //More members
}
public interface IDuplexSessionChannel : IOutputChannel,...
{}

If an error occurs during the IPC call, BufferedServiceBusHost<T> will recreate the channel it manages against that endpoint (not shown in Example 11-8). When you close the host, you need to close the proxies thus gracefully waiting for the calls in progress to complete. The problem is how to gracefully close all the retrieving threads, since MessageBufferClient.Retrieve() is a blocking operation and there is no built-in way to abort it. The solution is to post to each monitored buffer a special private message whose action will signal the retrieving thread to exit. This is what the SendCloseMessages() method does. The CloseListeners() method posts that private message to the buffers and waits for all the listening threads to terminate by joining them. Closing the listening threads stops feeding messages to the internal proxies, and once the proxies are closed (when all current calls in progress have returned), the host is ready to shut down. BufferedServiceBusHost<T> also supports an ungraceful Abort() method, which just aborts all threads (not shown in Example 11-8).

Finally, note that BufferedServiceBusHost<T> supports the IServiceBusProperties interface, which I defined as:

public interface IServiceBusProperties
{
   TransportClientEndpointBehavior Credential
   {get;set;}

   Uri[] Addresses
   {get;}
}

In the interest of decoupling, I needed such an interface in a few places, especially in streamlining buffering.

For completeness sake, all my nonstatic service bus helper classes support this simple interface, so you can use it for your needs.

Buffered client base

For the client, I wrote the class BufferedServiceBusClient<T>, defined as:

public abstract class BufferedServiceBusClient<T> :
                         HeaderClientBase<T,ResponseContext>,IServiceBusProperties
{
   //Buffer address from config
   public BufferedServiceBusClient()
   {}
   //No need for config file
   public BufferedServiceBusClient(Uri bufferAddress);


   /* Additional constructors with different credentials */
   protected virtual void Enqueue(Action action);
}

BufferedServiceBusClient<T> derives from my HeaderClientBase<T,H> (mentioned in previous chapters and detailed in Appendix B). The purpose of that base class is to support a response service, as discussed in the following section. For a plain client of a buffered service, that derivation is immaterial.

You can use BufferedServiceBusClient<T> with or without a client config file. The constructors that accept the buffer address do not require a config file. The parameterless constructor or the constructors that accept the endpoint name expect the config file to contain an endpoint matching the contract type with the one-way relay binding (although BufferedServiceBusClient<T> completely ignores that binding).

When deriving your proxy from BufferedServiceBusClient<T>, you will need to use the protected Enqueue() method instead of using the Channel property directly:

[ServiceContract]
interface IMyContract
{
   [OperationContract(IsOneWay = true)]
   void MyMethod(int number);
}

class MyContractClient : BufferedServiceBusClient<IMyContract>,IMyContract
{
   public void MyMethod(int number)
   {
      Enqueue(()=>Channel.MyMethod(number));
   }
}

Enqueue() accepts a delegate (or a Lambda expression) that wraps the use of the Channel property. The result is still type-safe. Example 11-9 shows a partial listing of the BufferedServiceBusClient<T> class.

Example 11-9. Partial listing of BufferedServiceBusClient<T>

public abstract class BufferedServiceBusClient<T> :
                                              HeaderClientBase<T,ResponseContext>,
                                              IServiceBusProperties
                                              where T : class
{
   MessageBufferClient m_BufferClient;

   public BufferedServiceBusClient(Uri bufferAddress) :
                                                base(new NetOnewayRelayBinding(),
                                                new EndpointAddress(bufferAddress))
   {}

   protected virtual void Enqueue(Action action)
   {
      try
      {
         action();
      }
      catch(InvalidOperationException exception)
      {
         Debug.Assert(exception.Message == "This message cannot support the
                                         operation because it has been written.");
      }
   }
   protected override T CreateChannel()
   {
      ServiceBusHelper.VerifyBuffer(Endpoint.Address.Uri.AbsoluteUri,Credential);

      m_BufferClient =
                 MessageBufferClient.GetMessageBuffer(Credential,
                                                Endpoint.Address.Uri.AbsoluteUri);

      return base.CreateChannel();
   }
   protected override void PreInvoke(ref Message request)
   {
      base.PreInvoke(ref request);

      m_BufferClient.Send(request);
   }
   protected TransportClientEndpointBehavior Credential
   {
      get
      {...}
      set
      {...}
   }
}

The constructors of BufferedServiceBusClient<T> supply its base constructor with the buffer address and the binding (always a one-way relay binding to enforce the one-way operations validation). The CreateChannel() method verifies the target buffer exists and obtains a MessageBufferClient representing it. The heart of BufferedServiceBusClient<T> is the PreInvoke() method. PreInvoke() is a virtual method provided by InterceptorClientBase<T>, the base class of HeaderClientBase<T,H>:

public abstract class InterceptorClientBase<T> : ClientBase<T> where T : class
{
   protected virtual void PreInvoke(ref Message request);
   //Rest of the implementation
}

public abstract class HeaderClientBase<T,H> : InterceptorClientBase<T>
                                                                   where T : class
{...}

InterceptorClientBase<T> is described in Appendix E. Briefly, it is part of a generic interception framework I implemented that allows you to easily process the WCF messages before and after the client dispatches them. BufferedServiceBusClient<T> overrides PreInvoke() and uses the buffer client to send the message to the buffer. This way, the client maintains a structured programming model and BufferedServiceBusClient<T> encapsulates the interaction with the WCF message. The downside is that the message can only be sent once, and when the root class of ClientBase tries to send it, it throws an InvalidOperationException. This is where Enqueue() comes in handy by snuffing out that exception.

Response Service

As discussed in Chapter 9, the only way to receive the result (or errors) of a queued call is to use a queued response service. The same design pattern holds true when dealing with buffers. The client needs to provide a dedicated response buffer for the service to buffer the response to. The client also needs to pass the response address and the method ID in the message headers, just as with the MSMQ-based calls, and you can use many of the supporting types from Chapter 9. The main difference between the MSMQ-based response service and the service bus is that the response buffer must also reside in the service bus, as shown in Figure 11-15.

Service bus buffered response service

Figure 11-15. Service bus buffered response service

Client side

To streamline the client side, I wrote the class ClientBufferResponseBase<T>, defined as:

public abstract class ClientBufferResponseBase<T> :
                                       BufferedServiceBusClient<T> where T : class
{
   public readonly Uri ResponseAddress;

   public ClientBufferResponseBase(Uri responseAddress);

   /* Additional constructors with different credentials and parameters */

   protected virtual string GenerateMethodId();
}

ClientBufferResponseBase<T> is a specialized subclass of BufferedServiceBusClient<T> and it adds the response context to the message headers (this is why I made BufferedServiceBusClient<T> derive from HeaderClientBase<T,H> and not merely from InterceptorClientBase<T>). You will need to derive your specific proxies from ClientBufferResponseBase<T> and use the Enqueue() method, for example:

[ServiceContract]
interface ICalculator
{
   [OperationContract(IsOneWay = true)]
   void Add(int number1,int number2);
}

class CalculatorClient : ClientBufferResponseBase<ICalculator>,ICalculator
{
   public CalculatorClient(Uri responseAddress) : base(responseAddress)
   {}

   public void Add(int number1,int number2)
   {
      Enqueue(()=>Channel.Add(number1,number2));
   }
}

Using the subclass of ClientBufferResponseBase<T> is straightforward:

Uri resposeAddress =
             new Uri("sb://MyNamespace.servicebus.windows.net/MyResponseBuffer/");

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
proxy.Close();

As with ClientResponseBase<T>, something that is very handy when managing the responses on the client side to have the invoking client obtain the method ID used to dispatch the call. You can do this easily with the Header property:

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
string methodId = proxy.Header.MethodId;
proxy.Close();

Example 11-10 lists the implementation of ClientBufferResponseBase<T>.

Example 11-10. Implementing ClientBufferResponseBase <T>

public abstract class ClientBufferResponseBase<T> :
                                       BufferedServiceBusClient<T> where T : class
{
   protected readonly Uri ResponseAddress;

   public ClientBufferResponseBase(Uri responseAddress)
   {
      ResponseAddress = responseAddress;
   }

   /* More Constructors */

   protected override void PreInvoke(ref Message request)
   {
      string methodId = GenerateMethodId();
      Header = new ResponseContext(ResponseAddress.AbsoluteUri,methodId);
      base.PreInvoke(ref request);
   }
   protected virtual string GenerateMethodId()
   {
      return Guid.NewGuid().ToString();
   }

   //Rest of the implementation
}

ClientBufferResponseBase<T> overrides the PreInvoke() method of HeaderClientBase<T,H> so that it can generate a new method ID for each call and set it into the headers, just like ClientResponseBase<T> of Example 9-23.

Service side

Mimicking again the concepts and solutions of the MSMQ-based calls, to streamline the work required by the buffered service to call the response service, I wrote the class ServiceBufferResponseBase<T>, shown in Example 11-11.

Example 11-11. The ServiceBufferResponseBase<T> class

public abstract class ServiceBufferResponseBase<T> : BufferedServiceBusClient<T>
                                                                   where T : class
{
   public ServiceBufferResponseBase() :
                            base(new Uri(ResponseContext.Current.ResponseAddress))
   {
      Header = ResponseContext.Current;

      //Grab the credentials the host was using
      IServiceBusProperties properties =
                           OperationContext.Current.Host as IServiceBusProperties;
      Credential = properties.Credential;
   }
}

While the service can use a plain BufferedServiceBusClient<T> to enqueue the response, you will need to extract the response buffer address out of the headers and somehow obtain the credentials to log into the service bus buffer. You will also need to provide the headers of the outgoing call with the response context. You can streamline all of these steps with ServiceBufferResponseBase<T>. ServiceBufferResponseBase<T> provides its base constructor with the address out of the response context, and it also sets that context into the outgoing headers. Another simplifying assumption ServiceBufferResponseBase<T> makes is that the responding service can use the same credentials its host used (to retrieve messages from its own buffer) to send messages to the response buffer. To that end, ServiceBufferResponseBase<T> obtains a reference to its own host from the operation context and reads the credentials using the IServiceBusProperties implementation of the host. ServiceBufferResponseBase<T> copies those credentials for its own use (done inside BufferedServiceBusClient<T>). This, of course, mandates the use of BufferedServiceBusHost<T> to host the service in the first place.

Your service needs to derive a proxy class from ServiceBufferResponseBase<T> and use it to respond. For example, given this response contract:

[ServiceContract]
interface ICalculatorResponse
{
   [OperationContract(IsOneWay = true)]
   void OnAddCompleted(int result,ExceptionDetail error);
}

The definition of the proxy to the response service will be:

class CalculatorResponseClient :
                ServiceBufferResponseBase<ICalculatorResponse>,ICalculatorResponse
{
   public void OnAddCompleted(int result,ExceptionDetail error)
   {
      Enqueue(()=>Channel.OnAddCompleted(result,error));
   }
}

Example 11-12 shows a simple buffered service responding to its client.

Example 11-12. Using ServiceBufferResponseBase<T>

class MyCalculator : ICalculator
{
   [OperationBehavior(TransactionScopeRequired = true)]
   public void Add(int number1,int number2)
   {
      int result = 0;
      ExceptionDetail error = null;
      try
      {
         result = number1 + number2;
      }
      //Don't rethrow
      catch(Exception exception)
      {
         error = new ExceptionDetail(exception);
      }
      finally
      {
         CalculatorResponseClient proxy = new CalculatorResponseClient();
         proxy.OnAddCompleted(result,error);
         proxy.Close();
      }
   }
}

Compare Example 11-12 to Example 9-25—they are virtually identical.

Response service

All the response service needs is to access the method ID from the message headers as shown in Example 11-13.

Example 11-13. Implementing a response service

class MyCalculatorResponse : ICalculatorResponse
{
   [OperationBehavior(TransactionScopeRequired = true)]
   public void OnAddCompleted(int result,ExceptionDetail error)
   {
      string methodId = ResponseContext.Current.MethodId;
      ...
   }
}

Example 11-13 is identical to Example 9-27.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.17.91