Chapter 2. Reactive Programming with C#

In the previous chapter, we gave an overall introduction to reactive programming and related languages and frameworks.

In this chapter, we will see a practical example of reactive programming with pure C# coding.

The following topics will be discussed here:

  • IObserver interface
  • IObservable interface
  • Subscription life cycle
  • Sourcing events
  • Filtering events
  • Correlating events
  • Sourcing from CLR streams
  • Sourcing from CLR enumerables

IObserver interface

This core level interface is available within the Base Class Library (BCL) of .NET 4.0 and is available for the older 3.5 as an add-on.

The use is pretty simple and the goal is to provide a standard way of handling the most basic features of any reactive message consumer.

As already seen in the previous chapter, reactive messages flow by a producer and a consumer and subscribe for some messages. The IObserver C# interface is available to construct message receivers that comply with the reactive programming layout by implementing the three main message-oriented events: a message received, an error received, and a task completed message.

The IObserver interface has the following sign and description:

    // Summary: 
    //     Provides a mechanism for receiving push-based notifications. 
    // 
    // Type parameters: 
    //   T: 
    //     The object that provides notification information.This type parameter is 
    //     contravariant. That is, you can use either the type you specified or any 
    //     type that is less derived. For more information about covariance and contravariance, 
    //     see Covariance and Contravariance in Generics. 
    public interface IObserver<in T> 
    { 
        // Summary: 
        //     Notifies the observer that the provider has finished sending push-based notifications. 
        void OnCompleted(); 
        // 
        // Summary: 
        //     Notifies the observer that the provider has experienced an error condition. 
        // 
        // Parameters: 
        //   error: 
        //     An object that provides additional information about the error. 
        void OnError(Exception error); 
        // 
        // Summary: 
        //     Provides the observer with new data. 
        // 
        // Parameters: 
        //   value: 
        //     The current notification information. 
        void OnNext(T value); 
    } 

Any new message to flow to the receiver implementing such an interface will reach the OnNext method. Any error will reach the OnError method, while the task completed acknowledgement message will reach the OnCompleted method.

The use of an interface means that we cannot use generic premade objects from the BCL. We need to implement any receiver from scratch by using such an interface as a service contract. In Chapter 3, Reactive Extension Programming , we will be able to use subjects that will give us the chance not to implement such interfaces anytime, but for now, let's play this way.

Let's see an example, because talking about a code example is always simpler than talking about something theoretical. The following examples show how to read from a console application command from a user in a reactive way:

class Program 
{ 
    static void Main(string[] args) 
    { 
        //creates a new console input consumer 
        var consumer = new ConsoleTextConsumer(); 
 
        while (true) 
        { 
            Console.WriteLine("Write some text and press ENTER to send a 
            message
Press ENTER to exit"); 
            //read console input 
            var input = Console.ReadLine(); 
 
            //check for empty messate to exit 
            if (string.IsNullOrEmpty(input)) 
            { 
                //job completed 
                consumer.OnCompleted(); 
 
                Console.WriteLine("Task completed. Any further message will
                generate an error"); 
            } 
            else 
            { 
                //route the message to the consumer 
                consumer.OnNext(input); 
            } 
        } 
    } 
} 
public class ConsoleTextConsumer : IObserver<string> 
{ 
    private bool finished = false; 
    public void OnCompleted() 
    { 
        if (finished) 
        { 
            OnError(new Exception("This consumer already finished it's lifecycle")); 
            return; 
        } 
 
        finished = true; 
        Console.WriteLine("<- END"); 
    } 
 
    public void OnError(Exception error) 
    { 
        Console.WriteLine("<- ERROR"); 
        Console.WriteLine("<- {0}", error.Message); 
    } 
 
    public void OnNext(string value) 
    { 
        if (finished) 
        { 
            OnError(new Exception("This consumer finished its lifecycle")); 
            return; 
        } 
 
        //shows the received message 
        Console.WriteLine("-> {0}", value); 
        //do something 
 
        //ack the caller 
        Console.WriteLine("<- OK"); 
    } 
} 

The preceding example shows the IObserver interface usage within the ConsoleTextConsumer class that simply asks a command console (DOS-like) for the user input text to do something. In this implementation, the class simply writes out the input text because we simply want to look at the reactive implementation.

The first important concept here is that a message consumer knows nothing about how messages are produced. The consumer simply reacts to one of the three events (not CLR events). Besides this, some kind of logic and cross-event ability is also available within the consumer itself. In the preceding example, we can see that the consumer simply showed any received message again on the console. However, if a complete message puts the consumer in a finished state (by signaling the finished flag), any other message that comes on the OnNext method will be automatically routed to the error one. Likewise, any other complete message that reaches the consumer will produce another error once the consumer is already in the finished state.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.183.10