Chapter 10. Publish/Subscribe Systems

In This Chapter

In “publish/subscribe systems[…]processes can subscribe to messages containing information on specific subjects, while other processes produce (i.e. publish) such messages” (Tannenbaum and van Steen 2002, 701). Publish/subscribe systems are required in many scenarios. In the financial industry, subscriptions to prices are needed. Subscriptions to sensor data and to information about other equipment are required in manufacturing. In computer systems administration, the administrators need to subscribe to information about the security and states of the systems.

Web Services Eventing and Web Services Notification are competing protocols pertinent to publish/subscribe systems. Both specify formats for subscription messages and for publication messages. Neither format is likely to become very important until at least both International Business Machines (IBM) and Microsoft endorse just one of the protocols.

The Web Services Notification specification provides a handy description of some of the various ways in which publishers can provide updates to subscribers (Graham, Hull and Murray 2005, 24). In push-style notification, subscribers send subscription messages to publishers, who then send publication messages to the subscribers. In pull-style notification, subscribers send subscription messages to publishers, who send publication messages to a pull-point that is known to the subscribers, from which the subscribers retrieve the publication messages. In brokered notification, subscribers send subscription messages to brokers, who retrieve publication messages sent by publishers and make them available to the subscribers.

There are various ways of constructing publish/subscribe systems with the Windows Communication Foundation. This chapter describes several of them.

Publish/Subscribe Using Callback Contracts

A simple way of building a publish/subscribe system with the Windows Communication Foundation is to use callback contracts. This service contract, IPublisher,

[ServiceContract(Session=true,CallbackContract=typeof(ISubscriber))]
public interface IPublisher
{
    [OperationContract]
    KnownDataPoint[] GetKnownDataPoints();

    [OperationContract]
    void Subscribe(KnownDataPoint[] dataPoints, out bool subscriptionAccepted);
}

identifies a callback contract, ISubscriber, that clients of the service are required to implement:

[ServiceContract]
public interface ISubscriber
{
    [OperationContract(IsOneWay=true)]
    void Notify(Guid dataPointIdentifier, byte[] value);
}

Because all clients of a service that implements IPublisher must implement ISubscriber, which exposes a one-way operation called Notify(), the service can rely on being able to use the client’s Notify() operation to publish data to the client. The callback contract can include any number of operations, but they must all be one-way operations.

Note that the IPublisher service contract also has the value of the Session parameter of the ServiceContract attribute set to true. That signifies that the messages exchanged between a client and a service for the duration of a connection between them will be grouped together by the Windows Communication Foundation into a session, and that the Windows Communication Foundation will maintain some information about the state of each session. Having the Windows Communication Foundation do that is a prerequisite for using callback contracts.

Using callback contracts requires not only the obvious task of specifying a callback contract for a service contract, but also the task of selecting a binding for the service by which the service can initiate transmissions to the client. For a binding to allow for that possibility, it must incorporate the composite duplex binding element, CompositeDuplexBindingElement. Two standard bindings that incorporate that binding element are WSDualHttpBinding and NetTcpBinding.

To use the operations of the callback contract implemented by the client, the service requires a proxy for communicating with the client. The service obtains that proxy by using the GetCallbackChannel<T>() generic method of the Windows Communication Foundation’s OperationContext class, introduced in Chapter 2, “The Fundamentals”:

ISubscriber proxy = OperationContext.Current.GetCallbackChannel<ISubscriber>();
proxy.Notify(...);

To see how to use the ISubscriber callback contracts in a publish/subscribe solution, follow these steps:

  1. Copy the code associated with this chapter that you downloaded from www.samspublishing.com to the folder C:WCFHandsOn. The code is all in a folder called PublishSubscribe. After you have unzipped the code, there should be a folder that looks like the one shown in Figure 10.1.

    PublishSubscribe folder.

    Figure 10.1. PublishSubscribe folder.

  2. Open the solution C:WCFHandsOnPublishSubscribeCallbacksCallbackContract.sln.

    The solution consists of six projects:

    • The RandomDataPoint project is for building a class library with a class called RandomDataPoint that represents the source of the information that subscribers want to receive.

    • The RandomDataPoint class derives from the DataPoint class provided by the class library built from the DataPoint project.

    • The PublisherService project is for building a class library incorporating the IPublisher service contract, which has ISubscriber as a callback contract. The class library also includes the PublisherService class, a service type that implements the IPublisher service contract.

    • The PublisherServiceHost project provides a console application to serve as the host for the PublisherService service type.

    • SubscriberOne and SubscriberTwo are both console applications with clients of PublisherService service that implement the ISubscriber callback contract.

  3. Examine the IPublisher service contract in the IPublisher.cs module of the PublisherService project in the CallbackContract solution:

    [ServiceContract(Session=true,CallbackContract=typeof(ISubscriber))]
    public interface IPublisher
    {
        [OperationContract]
        KnownDataPoint[] GetKnownDataPoints();
    
        [OperationContract]
        void Subscribe(KnownDataPoint[] dataPoints,
                         out bool subscriptionAccepted);
    }

    The IPublisher interface is a Windows Communication Foundation service contract that designates ISubscriber as its callback contract. The service contract provides the GetKnownDataPoints() operation for retrieving the identifiers of the data items about which a service that implements the contract can publish information. The Subscribe() operation is provided for clients to subscribe to information about one or more of those data items.

  4. Look at the ISubscriber service contract in the ISubscriber.cs module of the PublisherService project:

    [ServiceContract]
    public interface ISubscriber
    {
        [OperationContract(IsOneWay=true)]
        void Notify(Guid dataPointIdentifier, byte[] value);
    }

    ISubscriber is a service contract, all the operations of which are one-way operations. Actually, there is just one operation, called Notify(), by which the service can push the current values of a data item to the client.

  5. Examine the PublisherService service type’s implementation of the IPublisher contract’s Subscribe() method, in the PublisherService.cs module of the PublisherService project:

    void IPublisher.Subscribe(
        KnownDataPoint[] dataPoints, out bool subscriptionAccepted)
    {
        Console.WriteLine("Received subscription request.");
        subscriptionAccepted = false;
        string dataPointIdentifier = null;
        if (dataPoints.Length == 1)
        {
            dataPointIdentifier = dataPoints[0].Identifier;
            this.ValidateDataPoint(dataPointIdentifier, out subscriptionAccepted);
        }
        if (subscriptionAccepted)
        {
            if (!(this.randomDataPoint.Active))
            {
                this.randomDataPoint.Active = true;
            }
            lock (this.subscribersLock)
            {
                this.subscribers.Add(
                    OperationContext.Current.GetCallbackChannel<ISubscriber>());
            }
        }
    }

    After confirming that the subscription request is for information about a data item of which the service is aware, the method retrieves a proxy for communicating with the subscriber using the Windows Communication Foundation’s OperationContext class. Then it adds that proxy to a list of subscriber proxies.

  6. Study the NextValueHandler() method of the PublisherService service type, which is also in the PublisherService.cs module of the PublisherService project:

    private void NextValueHandler(IDataPoint sender, byte[] newValue)
    {
        lock(this.subscribersLock)
        {
            for(int index = this.subscribers.Count - 1; index >= 0; index--)
            {
                try
                {
                     this.subscribers[index].Notify(sender.Identifier, newValue);
                }
                catch (Exception exception)
                {
                     Console.WriteLine(
                         "Removing subscriber due to exception {0}.",
                         exception.ToString());
                     this.subscribers.RemoveAt(index);
                }
                if (this.subscribers.Count <= 0)
                {
                    this.randomDataPoint.Active = false;
                }
            }
        }
    }

    This method is the one by which the service type is notified of a change in the value of the data item about which it publishes information. The service type iterates through the list of subscriber proxies, using each proxy to publish a message concerning the fluctuation in the value of the data item to a subscriber.

  7. Look at the subscribers’ implementation of the Notify() operation of the ISubscriber callback contract, which is in the Subscriber.cs module of the SubscriberOne project of the CallbackContract solution. It simply outputs the content of messages published by the client to the console:

    void ISubscriber.Notify(Guid dataPointIdentifier, byte[] value)
    {
        Console.WriteLine(
            "Notified of value {0} of data point {1}.",
                BitConverter.ToInt32(value,0),
                dataPointIdentifier.ToString());
    }
  8. Start debugging the solution. Console windows for the PublisherServiceHost and for the two subscribers should appear.

  9. When there is activity in the console of the PublisherServiceHost, enter a keystroke into the console windows of both subscribers’ consoles.

    After a few moments, the service should begin publishing messages about fluctuations in the value of a data item to both of the subscribers, as shown in Figure 10.2. It may take a moment after the first published message is received by the first subscriber before the first published message is received by the second subscriber.

    Publish/Subscribe using callback contracts.

    Figure 10.2. Publish/Subscribe using callback contracts.

  10. Stop debugging the solution.

Callback contracts provide a very easy way of implementing publish/subscribe with the Windows Communication Foundation. As is true of push-style notification solutions generally (Graham, Hull and Murray 2005, 24), the technique presupposes the network being configured to allow the publisher to transmit messages to the client.

Publish/Subscribe Using MSMQ Pragmatic Multicasting

Version 3 of Microsoft Message Queuing (MSMQ), a technology provided free of charge with Microsoft Windows operation systems, added support for the pragmatic multicasting (PGM) protocol. As shown in Figure 10.3, a nontransactional queue can be associated with a PGM address, and any number of queues can be associated with the same PGM address.

Associating a PGM address with an MSMQ queue.

Figure 10.3. Associating a PGM address with an MSMQ queue.

As Anand Rajagopalan points out, this new facility of MSMQ provides a simple way of doing publish/subscribe with pull-style notification (Rajagopalan 2005). A publisher can direct publication messages to a PGM address via MSMQ, which will result in those messages being added to all the subscriber queues associated with that address. Subscribers can then pull the messages from their respective queues. Because, as Rajagopalan further points out, the Windows Communication Foundation provides the MsmqIntegrationBinding for exchanging messages with MSMQ applications, this way of doing publish/subscribe can also be implemented with the Windows Communication Foundation:

  1. Open the solution C:WCFHandsOnPublishSubscribeMSMQPragmaticMulticastingMSMQPragmaticMulticasting.sln.

    The solution consists of four projects:

    • The Order project is for building a class library with a class called PurchaseOrder.

    • The Publisher project provides a console application that publishes information about incoming purchase orders to a PGM address via MSMQ, using the Windows Communication Foundation’s MsmqIntegrationBinding.

    • SubscriberOne and SubscriberTwo are both console applications that subscribe to notifications of incoming purchase orders, using the Windows Communication Foundation’s MsmqIntegrationBinding to pull the notifications from queues associated with the PGM address to which the Publisher sends the notifications.

  2. Look at the PurchaseOrder class in the Order.cs module of the Order project in the MSMQPragmaticMulticasting project, reproduced in Listing 10.1. The class claims to be serializable by having the Serializable attribute. It overrides the ToString() method of the base class, Object, to provide an informative representation of itself as a string. It will be instances of this class that the publisher in this solution will be sending to the subscribers.

    Example 10.1. Notification Class

    [Serializable]
    public class PurchaseOrder
    {
        public string orderIdentifier;
        public string customerIdentifier;
        public PurchaseOrderLineItem[] orderLineItems;
        private OrderStates orderStatus;
    
        public float TotalCost
        {
            get
            {
                float totalCost = 0;
                foreach (PurchaseOrderLineItem lineItem in orderLineItems)
                    totalCost += lineItem.TotalCost;
                return totalCost;
            }
       }
    
        public OrderStates Status
        {
            get
            {
                return orderStatus;
            }
            set
            {
                orderStatus = value;
            }
        }
        public override string ToString()
        {
            StringBuilder buffer =
                new StringBuilder("Purchase Order: " +
            orderIdentifier + "
    ");
            buffer.Append("	Customer: " + customerIdentifier + "
    ");
            buffer.Append("	OrderDetails
    ");
            foreach (PurchaseOrderLineItem lineItem in orderLineItems)
            {
                buffer.Append("		" + lineItem.ToString());
            }
    
            buffer.Append("	Total cost of this order: $" + TotalCost + "
    ");
            buffer.Append("	Order status: " + Status + "
    ");
            return buffer.ToString();
        }
    }
  3. Examine the IOrderSubscriber interface in the Publisher.cs module of the Publisher project, and in the Subscriber.cs module of the SubscriberOne project:

    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
    [KnownType(typeof(PurchaseOrder))]
    public interface IOrderSubscriber
    {
    
        [OperationContract(IsOneWay = true, Action = "*")]
        void Notify(MsmqMessage<PurchaseOrder> message);
    }

    This .NET interface is designated as a Windows Communication Foundation service contract by the ServiceContract attribute. It includes a single operation, Notify(), that accepts a single parameter of the type MsmqMessage<PurchaseOrder>. MsmqMessage<T> is a generic type provided by the Windows Communication Foundation for which any serializable type can serve as the type argument. It allows data to be marshaled in and out of MSMQ messages sent or received via the MSMQ integration binding.

    In Chapter 4, “Security,” it was explained that the value of the Action parameter of the OperationContract attribute is used to correlate messages with operations. A value usually does not have to be provided for that parameter, because the Windows Communication Foundation automatically and invisibly supplies appropriate default values.

    However, the value “*” is provided for the Action parameter of the OperationContract attribute on the IOrderSubscriber contract’s Notify() operation. Specifying Action=”*” as the parameter to the OperationContract attribute signifies that the operation with that attribute is the unmatched message handler, which means that operation will be used to process all messages not matched with another operation. All messages received via the MSMQ integration binding are dispatched to the unmatched message handler of the receiving service. In this case all such messages will be dispatched to the method by which the IOrderSubscriber contract’s Notify() operation is implemented.

  4. Study the static Main() method of the Publisher class in the Publisher.cs module of the Publisher project:

    static void Main(string[] args)
    {
        [...]
        PurchaseOrder order = new PurchaseOrder();
        order.customerIdentifier = "somecustomer.com";
        order.orderIdentifier = Guid.NewGuid().ToString();
    
        PurchaseOrderLineItem firstLineItem = new PurchaseOrderLineItem();
        [...]
    
        PurchaseOrderLineItem secondLineItem = new PurchaseOrderLineItem();
        [...]
    
        order.orderLineItems =
            new PurchaseOrderLineItem[] {firstLineItem,    secondLineItem };
    
        IOrderSubscriber proxy =
            new ChannelFactory<IOrderSubscriber>(
                  "OrderPullPoint").CreateChannel();
    
        proxy.Notify(new MsmqMessage<PurchaseOrder>(order));
        ((IChannel)proxy).Close();
    
        [...]
    }

    The method sends notification of a purchase order to the subscribers using a proxy that is obtained in a customary way, using the Windows Communication Foundation’s ChannelFactory<T> generic. The Publisher code simply invokes the proxy’s Notify() operation, passing an instance of MsmqMessage<PurchaseOrder> created from the purchase order about which it wants to notify the subscribers.

  5. Look at the configuration of the Publisher in the App.Config file of the Publisher project to see the OrderPullPoint configuration referred to in the construction of the proxy:

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
        <system.serviceModel>
            <client>
                <endpoint name="OrderPullPoint"
                          address="msmq.formatname:MULTICAST=224.0.255.1:80"
                          binding="msmqIntegrationBinding"
                          bindingConfiguration="OrderPublicationBinding"
                          contract
                    ="Microsoft.ServiceModel.Samples.IOrderSubscriber">
                </endpoint>
            </client>
            <bindings>
                <msmqIntegrationBinding>
                    <binding name=" OrderPublicationBinding" exactlyOnce="false" >
                        <security mode="None" />
                    </binding>
                </msmqIntegrationBinding>
            </bindings>
        </system.serviceModel>
    </configuration>

    That configuration selects the Windows Communication Foundation’s standard MsmqIntegrationBinding as the binding to use in publishing the service. The settings of that standard binding are modified so as to not require the assurance of messages being delivered exactly once. That assurance, which is provided by default by the MsmqIntegrationBinding, is not possible in this case, because the destination queues are not transactional queues. They are not transactional queues because MSMQ queues associated with PGM addresses cannot be transactional.

    The address provided as the destination of the messages is msmq.formatname:MULTICAST=224.0.255.1:80. In that address, msmq is the scheme associated with the MSMQ-integration transport protocol by the MSQM integration binding. The expression formatname:MULTICAST signifies that the destination for messages is to be identified by a PGM address. The PGM address given is 224.0.255.1. The component 80 of the address is a port number.

  6. Compare the configuration of the Publisher with the configuration of a subscriber, such as the configuration of the first subscriber, in the App.Config file of the SubscriberOne project:

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
        <appSettings>
            <add key="orderQueueName" value=".private$WCFHandsOnOne" />
        </appSettings>
        <system.serviceModel>
        <services>
          <service
            type="Microsoft.ServiceModel.Samples.OrderSubscriber">
            <endpoint address="msmq.formatname:DIRECT=OS:.private$WCFHandsOnOne"
                      binding="msmqIntegrationBinding"
                      bindingConfiguration="OrderSubscriptionBinding"
                      contract="Microsoft.ServiceModel.Samples.IOrderSubscriber">
    
            </endpoint>
          </service>
        </services>
        <bindings>
          <msmqIntegrationBinding>
            <binding name="OrderSubscriptionBinding" exactlyOnce="false" >
              <security mode="None" />
            </binding>
          </msmqIntegrationBinding>
        </bindings>
      </system.serviceModel >
    </configuration>

    The subscriber configuration defines the configuration of a Windows Communication Foundation service that receives messages via MSMQ. The selection and configuration of the binding corresponds exactly with the selection and configuration of the binding for the publisher. Whereas the address provided as the destination of the publisher’s messages was a PGM address, the address provided as the source of messages for the subscriber service is the name of an MSMQ queue associated with that PGM address.

  7. Examine the static Main() method of the OrderSubscriber class of one of the subscribers in the Subscriber.cs module of the SubscriberOne project:

    public static void Main()
    {
        string queueName = ConfigurationManager.AppSettings["orderQueueName"];
    
        if (!(MessageQueue.Exists(queueName)))
        {
            MessageQueue.Create(queueName);
            MessageQueue queue = new MessageQueue(queueName);
            queue.MulticastAddress =
                ConfigurationManager.AppSettings["multicastAddress"];
        }
    
        using (ServiceHost serviceHost = new ServiceHost(typeof(OrderSubscriber)))
        {
            serviceHost.Open();
    
            Console.WriteLine("The service is ready.");
            Console.WriteLine("Press any key to terminate the service.");
            Console.ReadLine();
    
            serviceHost.Close();
        }
    }

    The method creates the queue that serves as the subscriber’s pull-point if it does not already exist. In creating the queue, it associates the queue with the PGM address to which the publisher directs its messages.

    An instance of the OrderSubscriber class, which implements the IOrderSubscriber service contract, is then loaded into an application domain using an instance of the Windows Communication Foundation’s ServiceHost class. Then the Open() method of the ServiceHost instance is invoked, whereupon the Windows Communication Foundation’s channel layer will begin watching for messages delivered to the queue specified in the subscriber’s configuration file. Such messages will be dispatched, by the Windows Communication Foundation, to the implementation of the unmatched message handler, the Notify() operation, of the IOrderSubscriber service contract.

  8. Look at the OrderSubscriber class’s implementation of the Notify() operation of the IOrderSubscriber contract:

    public void Notify(MsmqMessage<PurchaseOrder> message)
    {
         PurchaseOrder order = (PurchaseOrder)message.Body;
         Random statusIndexer = new Random();
         order.Status = (OrderStates)statusIndexer.Next(3);
         Console.WriteLine("Processing {0} ", order);
    }

    Recall that the Notify() operation is designated as the unmatched message handler of the IOrderSubscriber contract, and also that all messages received via the MSMQ integration binding are dispatched to the method that implements the unmatched message handler. In this case, that method is the Notify() method of the OrderSubscriber class. The received messages are dispatched to the Notify() method as instances of the MsmqMessage<PurchaseOrder> type, from which instances of the PurchaseOrder class are extracted with this simple statement:

    PurchaseOrder order = (PurchaseOrder)message.Body;
  9. Start debugging the MSMQPragmaticMulticastingSolution. Console windows for the two subscriber applications should appear, as well as the console window of the publisher.

  10. When there is activity in both of the subscriber application’s console windows, enter a keystroke into the console window of the publisher. The results should appear as shown in Figure 10.4. Notifications of incoming purchase orders are published to the subscriber’s pull-points by the publisher, from which they are retrieved by the subscribers.

    Publish/Subscribe using MSMQ PGM.

    Figure 10.4. Publish/Subscribe using MSMQ PGM.

  11. Stop debugging the application.

Generally, when Windows Communication Foundation applications send messages to other Windows Communication Foundation applications via MSMQ queues, one uses the Windows Communication Foundation’s standard NetMsmqBinding, rather the MsmqIntegrationBinding. The NetMsmqBinding has the virtue of being more flexible, not requiring messages to be sent and received in the form of instances of MsmqMessage<T> types, and also allowing messages to be dispatched to operations other than the unmatched message handler. Usually, one must only resort to using the MsmqIntegrationBinding when a Windows Communication Foundation application must communicate with a non–Windows Communication Foundation application via MSMQ. In this case, however, all the applications communicating via MSMQ are Windows Communication Foundation applications, so what is the reason for using the MsmqIntegrationBinding rather than the NetMsmqBinding? The reason is that the implementation of the PGM protocol in MSMQ represents, in effect, a non–Windows Communication Foundation application interposed between the Windows Communication Foundation applications.

Publish/Subscribe Using Streaming

In using either callback contracts or MSMQ PGM to do publish/subscribe with the Windows Communication Foundation, there is the shortcoming of incurring the cost of sending an entire message with each notification from the publisher to the subscribers. That price is more acceptable when the size of the notification in proportion to the total size of the messages is larger, and when notifications are required less frequently. However, the requirement to publish frequent notifications of small items of information is commonplace. One can use the Windows Communication Foundation’s streamed transfer mode to avoid having to create an entire message for each notification in such cases.

The Streamed Transfer Mode

The Windows Communication Foundation uses a buffered transfer mode by default. That means that the entire contents of an outgoing message must have been written into a buffer before the message is sent, and that the entire contents of an incoming message must be read from a buffer before the message is dispatched for processing. However, the Windows Communication Foundation provides the option of a streamed transfer mode by which the content of an incoming message may be dispatched for processing by the receiver even before the entire content of the message has been formulated by the source:

  1. Open the solution C:WCFHandsOnPublishSubscribeStreamingStreaming.sln. It consists of two projects. The Client project is for building a Windows Forms application that displays an image retrieved from a Windows Communication Foundation service. That service is built using the other project in the solution, called Service.

  2. Examine the interface IPictureServer in the Program.cs module of the Service project. It is designated as a Windows Communication Foundation service contract, of which the only notable feature is that its sole operation, GetPicture(), is defined as returning a Stream object:

    [ServiceContract]
    public interface IPictureServer
    {
        [OperationContract]
        Stream GetPicture(string pictureName);
    }
  3. Look at the PictureServer class, which is a service type that implements the IPictureServer contract. It returns the image requested by a client as a FileStream object:

    internal class PictureServer: IPictureServer
    {
        Stream IPictureServer.GetPicture(string pictureName)
        {
            try
            {
               return new FileStream(pictureName, FileMode.Open);
            }
            catch (Exception)
            {
                return null;
            }
        }
    
    }
  4. See how the service is configured in the App.Config file of the PictureService project:

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
        <system.serviceModel>
            <services>
                <service type="Server.PictureServer">
                    <endpoint   address="http://localhost:8000/Picture/Server"
                               binding="basicHttpBinding"
                               bindingConfiguration="StreamedHttp"
                               contract="Server.IPictureServer,Server"/>
                </service>
            </services>
            <bindings>
                <basicHttpBinding>
                    <binding
                        name="StreamedHttp"
                        transferMode="StreamedResponse"
                        maxMessageSize="9223372036854775807"/>
                </basicHttpBinding>
            </bindings>
        </system.serviceModel>
    </configuration>

    The standard Windows Communication Foundation BasicHttpBinding is selected for the service, but the value of the transfer mode property of that binding is set to StreamedResponse. Note that the value of the maxMessageSize property is set to a very large number, which happens to be the maximum value.

  5. Examine the client application’s use of the GetPicture() operation of the service in the RetrievePicture() method of the MainForm.cs module of the Client project:

    private void RetrievePicture(object state)
    {
        if (this.InvokeRequired)
        {
            IPictureServer pictureServer =
                new ChannelFactory<IPictureServer>("PictureServer")
                    .CreateChannel();
            Stream pictureStream =
                pictureServer.GetPicture(
                    ConfigurationManager.AppSettings["PictureName"]);
            ((IChannel)pictureServer).Close();
    
            this.Invoke(
                new RetrievePictureDelegate(
    
                this.RetrievePicture),new object[]{pictureStream});
        }
        else
        {
           Bitmap bitMap = new Bitmap((Stream)state);
           this.Picture.Image = bitMap;
        }
    }

    The Stream object retrieved from the service via the GetPicture() operation is marshaled onto the user interface thread. Then it is displayed in the PictureBox control of the client application’s form.

  6. Start debugging the application. The console window of the service should appear, along with the client application’s form.

  7. When there is activity in the console window of the service, click the Get the Picture! button on the client application’s form. After a moment, a picture, retrieved from the service, should appear on the client application’s form, as shown in Figure 10.5.

    Retrieving a picture from a service using the streamed transfer mode.

    Figure 10.5. Retrieving a picture from a service using the streamed transfer mode.

  8. Stop debugging the application.

  9. Alter the App.Config file of the Service project as shown in Listing 10.2, thereby adding the mechanism introduced in Chapter 2 for logging the messages sent and received by the service.

    Example 10.2. Streamed Transfer Configuration with Message Logging

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
        <system.diagnostics>
            <sources>
                <source
                    name="System.ServiceModel.MessageLogging"
                    switchValue="Verbose">
                    <listeners>
                        <add
                            name="xml"
                            type="System.Diagnostics.XmlWriterTraceListener"
    initializeData="C:WCFHandsOnPublishSubscribeStreamingmessage.log" />
                    </listeners>
                </source>
            </sources>
            <trace autoflush="true" />
        </system.diagnostics>
         <system.serviceModel>
             <diagnostics>
                 <messageLogging logEntireMessage="true"
                                 maxMessagesToLog="300"
                                 logMessagesAtServiceLevel="false"
                                 logMalformedMessages="true"
                                 logMessagesAtTransportLevel="true" />
             </diagnostics>
             <services>
                 <service type="Server.PictureServer">
                     <endpoint    address="http://localhost:8000/Picture/Server"
                                 binding="basicHttpBinding
                                 bindingConfiguration="StreamedHttp"
                                 contract="Server.IPictureServer,Server/>
                 </service>
             </services>
             <bindings>
                 <basicHttpBinding>
                     <binding
                         name="StreamedHttp"
                         transferMode="StreamedResponse"
                         maxMessageSize="9223372036854775807"/>
                 </basicHttpBinding>
             </bindings>
         </system.serviceModel>
    </configuration>
  10. Now repeat steps 6, 7, and 8.

  11. Open the file C:WCFHandsOnPublishSubscribeStreamingMesage.log.

    That file should contain the following record of the message by which the service replied to the client, showing that the stream object was incorporated in the body of a SOAP message:

    <s:Envelope
        xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing"
        xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">
        <s:Header>
            <a:Action s:mustUnderstand="1">
            http://tempuri.org/IPictureServer/GetPictureResponse</a:Action>
            <a:To s:mustUnderstand=1>
            http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous
            </a:To>
            </s:Header>
            <s:Body>... stream ...</s:Body>
    </s:Envelope>
  12. Alter the App.Config file of the Client project to omit the adjustment to the maximum message size of the binding:

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
            <appSettings>
                <add key="PictureName" value="Pony.jpg"/>
            </appSettings>
            <system.serviceModel>
            <client>
                <endpoint name="PictureServer"
                          address="http://localhost:8000/Picture/Server"
                          binding="basicHttpBinding"
                          bindingConfiguration="StreamedHttp"
                          contract="Client.IPictureServer,Client"/>
           </client>
           <bindings>
               <basicHttpBinding>
                   <binding name="StreamedHttp"
                       transferMode="StreamedResponse"
                   />
               </basicHttpBinding>
           </bindings>
        </system.serviceModel>
    </configuration>
  13. Repeat, once again, steps 6, 7, and 8.

On this occasion, the client application should throw an exception, as shown in Figure 10.6.

Exceeding the maximum message size.

Figure 10.6. Exceeding the maximum message size.

This solution has demonstrated how to select the streamed transfer mode for the BasicHttpBinding. It has also shown that one can transmit a Stream object using the Windows Communication Foundation. By logging and examining the messages transmitted by the service, it became apparent that, by default, the Windows Communication Foundation sends streams embedded within SOAP messages. When the size of the stream causes the size of the message in which it is embedded to exceed the maximum message size of the binding, a trappable error is thrown.

However, the effect of the streamed transfer mode has remained mostly invisible. It has not yet been made apparent that the initial content of the stream was available to the client before the entire content of the stream was received.

Most important, this crucial line of code by which the service returned the stream to the client,

return new FileStream(pictureName, FileMode.Open);

does not reveal how individual data items can be sent progressively via a stream. That is what would be required in order to implement publish/subscribe using the Windows Communication Foundation’s streamed transfer mode.

Transmitting a Custom Stream with the Streamed Transfer Mode

To see how individual data items can be fed through a stream, follow these steps:

  1. Open the solution C:WCFHandsOnPublishSubscribeCustomStreamCustomStream.sln. It consists of two projects. The Client project is for building a console application that retrieves an image from a Windows Communication Foundation service. The service is built using the other project in the solution, called Service.

  2. Examine the interface IPictureServer in the Program.cs module of the Service project. It represents the same service contract that was used previously, with a single operation, GetPicture(), that returns a stream object:

    [ServiceContract]
    public interface IPictureServer
    {
        [OperationContract]
        Stream GetPicture(string pictureName);
    }
  3. See, however, that the PictureServer class that implements the IPictureServer contract is slightly altered from the earlier version. This time the stream that it returns is an instance of the CustomStream() class:

    internal class PictureServer: IPictureServer
    {
        Stream IPictureServer.GetPicture(string pictureName)
        {
            try
            {
                 CustomStream customStream = new CustomStream(pictureName);
                 return customStream;
            }
            catch (Exception)
            {
                 return null;
            }
        }
    }
  4. Study the definition of the CustomStream class in the CustomStream.cs module of the Service project (see Listing 10.3).

    Example 10.3. A Custom Stream Class

    public class CustomStream: Stream
    {
        private string backingStore = null;
        private FileStream backingStream = null;
        private bool initialRead = true;
        private DateTime startRead;
        private long totalBytes = 0;
        private CustomStream()
        {
        }
    
        public CustomStream(string fileName)
        {
            this.backingStore = fileName;
        }
    
        [...]
    
        public override int Read(byte[] buffer, int offset, int count)
        {
            TimeSpan duration;
    
            if (this.initialRead)
            {
                this.startRead = DateTime.Now;
                this.initialRead = false;
            }
            else
            {
                Thread.Sleep(100);
            }
    
            Console.WriteLine(string.Format(
                "Reading {0} bytes from backing store.", count));
    
            if (this.backingStream == null)
            {
                this.backingStream = new FileStream(
                    this.backingStore,
                    FileMode.Open);
            }
    
            int bytesRead = this.backingStream.Read(buffer, offset, count);
    
            if (bytesRead <= 0)
            {
                this.backingStream.Close();
            }
        this.totalBytes += bytesRead;
    
                duration = (DateTime.Now - this.startRead);
    
                Console.WriteLine(
                    "Sent {0} bytes in {1}:{2}.",
                     this.totalBytes,
                     duration.Seconds,
                     duration.Milliseconds);
    
                return bytesRead;
        }
    
        [...]
    }

    The CustomStream class derives from the abstract Stream class. Although it is required to override all the latter’s abstract methods, it really only provides a substantive override for the Read() method. What the CustomStream class’s Read() method does is return a chunk of the image requested by the client application, the maximum size of the chunk being specified by a parameter passed to the Read() method.

  5. Start debugging the application. The console window of the service application should appear, followed by the console window of the client application.

  6. When there is activity in the console window of the service application, enter a keystroke into the console window of the client application. The results should be as shown in Figure 10.7: As chunks of the image requested by the client are still being retrieved from the CustomStream object within the service, the chunks already transmitted to the client are being retrieved from the CustomStream object within the client.

    Using the streamed transfer mode with a custom stream class.

    Figure 10.7. Using the streamed transfer mode with a custom stream class.

  7. Stop debugging the solution.

This makes the effect of the streamed transfer mode vividly apparent. In response to a single request from a client, data is being transmitted to the client in chunks. The chunks received by the client are immediately available for processing, before all the chunks have been sent by the service.

In the service’s implementation of the operation used by the client to retrieve the picture,

internal class PictureServer: IPictureServer
{
    Stream IPictureServer.GetPicture(string pictureName)
    {
       try
       {
           CustomStream customStream = new CustomStream(pictureName);
           return customStream;
       }
       catch (Exception)
       {
           return null;
       }
    }
}

this single line of code,

return customStream;

causes the Windows Communication Foundation to send the initial parts of the response message to the client,

<s:Envelope
    xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing"
    xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">
    <s:Header>
        <a:Action s:mustUnderstand="1">
        http://tempuri.org/IPictureServer/GetPictureResponse</a:Action>
        <a:To s:mustUnderstand="1">
        http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous
        </a:To>
        </s:Header>
        <s:Body>

and then calls the Read() method of the stream iteratively, requesting up to one kilobyte of data from it on each iteration. Each chunk of data retrieved in that manner is transmitted to the client:

... stream ...

When the Read() method returns zero bytes, the Windows Communication Foundation closes the stream and transmits the remainder of the message:

                       </s:Body>
</s:Envelope>

Implementing Publish/Subscribe Using the Streamed Transfer Mode and a Custom Stream

Now it should be evident how to use the Windows Communication Foundation’s streamed transfer mode to implement publish/subscribe. When the data to be published consists of small data items, and the subscribers require notifications with minimal delay, the publisher can send a stream to each of its subscribers using the Windows Communication Foundation’s streamed transfer mode. The streams should be custom streams. The Windows Communication Foundation will invoke the Read() methods of the custom streams iteratively, requesting kilobytes of data to transmit to the subscribers. If the custom stream objects have updates available, they can provide those to the Windows Communication Foundation to publish to the subscribers. If no updates are available, the Read() methods of the streams can sleep until updates occur, or until some configurable timeout expires. If the timeout expires, zero bytes can be returned to the Windows Communication Foundation, which will close the stream. The subscriber can then choose to renew the subscription. The publisher buffers updates pertinent to the subscriber for a configurable period so that if the subscription is renewed, updates that occurred between the closing of the initial stream and the renewal of the subscription can be sent to the subscriber immediately upon the renewal.

If updates continue to be available, so that the custom streams continue to make data available to the Windows Communication Foundation as it iteratively calls their Read() the maximum sizes for the messages into which the Windows Communication Foundation is embedding the data retrieved from the custom streams will eventually be exceeded. So there should be logic in the custom streams that detects when the maximum message size is about to be exceeded. That logic will have the Windows Communication Foundation close the current stream and then immediately open a new stream to the subscriber.

All of these capabilities are implemented in a reusable library called StreamingPublicationSubscription that is included in the solution C:WCFHandsOnPublishSubscribeStreamedPublishSubscribeStreamedPublishSubcribe.sln. The key classes that it provides are the BufferedSubscriptionManager class and the NotificationStreamWriter class. The former is programmed to buffer data items to which subscriptions have been received for configurable periods, whereas the latter is programmed to retrieve data items from the BufferedSubscriptionManager and make them available to the Windows Communication Foundation. The NotificationStreamReader class is programmed to read the streams output by the NotificationStreamWriter class.

To see these classes in action, and to understand how to use them to implement publish/subscribe solutions, follow these steps:

  1. Open the solution C:WCFHandsOnPublishSubscribeStreamedPublishSubscribeStreamedPublishSubcribe.sln. Besides the project for building the StreamingPublicationSubscription library, the solution also has the Subscriber project, for building a subscriber console application, and the PublisherServiceHost project, for building a console application to host the publisher built from the PublisherService project.

  2. Examine the ISubscriber interface in the ISubscriber.cs module of the PublisherService project. That interface defines the service contract that all subscribers are expected to implement. It defines a single operation, Notify(), that takes a Stream object as a parameter:

    [ServiceContract]
    public interface ISubscriber
    {
         [OperationContract(IsOneWay=true)]
         void Notify(Stream stream);
    }
  3. Look at the Subscribe() method of the PublisherService class in the PublisherService.cs module of the PublisherService project. That method, shown in Listing 10.4, executes when subscribers submit subscription requests. After validating the subscription and the subscriber, the method invokes the Activate() method of the PublishingAgent class on a background thread.

    Example 10.4. Method for Processing Subscription Requests

    void IPublisher.Subscribe(
      KnownDataPoint[] dataPoints,
      out bool subscriptionAccepted)
    {
        subscriptionAccepted = false;
        string dataPointIdentifier = null;
        if (dataPoints.Length == 1)
        {
            dataPointIdentifier = dataPoints[0].Identifier;
            this.ValidateDataPoint(dataPointIdentifier, out subscriptionAccepted);
        }
    
        string configuration = null;
        if(subscriptionAccepted)
        {
            this.ValidateSubscriber(
            OperationContext.Current.ServiceSecurityContext.WindowsIdentity.Name,
                out subscriptionAccepted,
                out configuration);
        }
    
        if (subscriptionAccepted)
        {
    
            ThreadPool.QueueUserWorkItem(
                new WaitCallback(
                    ((IPublishingAgent)new PublishingAgent(
                        configuration,
                        dataPointIdentifier)).Activate
                        ),null);
       }
    }
  4. See what is done by the Activate() method of the PublishingAgent class, in the PublishingAgent.cs module of the PublisherService project. The method is reproduced in Listing 10.5.

    Example 10.5. Activating a Publication Agent

    void IPublishingAgent.Activate(object state)
    {
        this.randomDataPoint.Active = true;
    
        NotificationStreamWriter writer = null;
    
    
        IBufferedSubscriptionManager bufferedDataSubscriptionManager
            = new BufferedSubscriptionManager(this.subscriberConfiguration, 100);
        bufferedDataSubscriptionManager.AddSubscription(this.randomDataPoint);
    
        while (true)
        {
            using (SubscriberProxy subscriberProxy
                = new SubscriberProxy(this.subscriberConfiguration))
            {
                ISubscriber subscriber = (ISubscriber)subscriberProxy;
                writer = new NotificationStreamWriter(
                    bufferedDataSubscriptionManager,
                    long.Parse(ConfigurationManager.AppSettings["MessageCapacity"]),
                    new TimeSpan(
                        0,
                        0,
                        0,
    int.Parse(
        ConfigurationManager.AppSettings["UpdateFrequencyInSeconds"])),
                    new TimeSpan(
                        0,
                        0,
                        0,
                        0,
    int.Parse(
        ConfigurationManager.AppSettings["DataSourceTimeoutInMilliseconds])));
                subscriber.Notify(writer);
                subscriberProxy.Close();
                Console.WriteLine("Batch completed.");
             }
        }
    }

    The method adds details of the subscription to an instance of the BufferedDataSubscriptionManager class, which will begin buffering updates to the values of the data point to which the subscription pertains. Then the method invokes the subscriber’s Notify() operation, passing an instance of a NotificationStreamWriter, which will then proceed to read updates from the BufferedDataSubscriptionManager and pass them to the Windows Communication Foundation for transmission.

  5. Start debugging the solution. The console windows of the Subscriber application and the PublisherServiceHost application should appear.

  6. When there is activity in the console window of the PublisherServiceHost application, enter a keystroke into the console of the Subscriber application. After a moment, updates from the publisher will begin to be registered in the console window of the subscriber.

  7. Watch for notification in the console window of the subscriber that the maximum size of a message incorporating a stream was about to be exceeded, resulting in that stream being closed and a new one automatically being provided by the publisher. This effect is shown in Figure 10.8.

    Implementing publish/subscribe with the StreamingPublicationSubscription library.

    Figure 10.8. Implementing publish/subscribe with the StreamingPublicationSubscription library.

  8. Stop debugging the solution.

Summary

There are various ways of implementing publish/subscribe solutions with the Windows Communication Foundation. Callback contracts and MSMQ PGM are suitable for scenarios in which the size of notifications is larger and the required frequency for updates is lower. When the notifications are smaller and more frequent, one can use the streamed transfer mode with a custom stream class to stream the notifications to the subscribers.

References

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.37.136