IObservable interface

The IObservable interface, the opposite of the IObserver interface, has the task of handling message production and the observer subscription. It routes right messages to the OnNext message handler and errors to the OnError message handler. As its life cycle ends, it acknowledges all the observers on the OnComplete message handler.

To create a valid reactive observable interface, we must write something that is not locking against user input or any other external system input data. The observable object acts as an infinite message generator, something like an infinite enumerable of messages; although in such cases, there is no enumeration.

Once a new message is available somehow, observer routes it to all the subscribers.

In the following example, we will try creating a console application to ask the user for an integer number and then route such a number to all the subscribers. Otherwise, if the given input is not a number, an error will be routed to all the subscribers.

This is observer similar to the one already seen in the previous example. Take a look at the following codes:

/// <summary> 
/// Consumes numeric values that divides without rest by a given number 
/// </summary> 
public class IntegerConsumer : IObserver<int> 
{ 
    readonly int validDivider; 
    //the costructor asks for a divider 
    public IntegerConsumer(int validDivider) 
    { 
        this.validDivider = validDivider; 
    } 
 
    private bool finished = false; 
    public void OnCompleted() 
    { 
        if (finished) 
            OnError(new Exception("This consumer already finished it's lifecycle")); 
        else 
        { 
            finished = true; 
            Console.WriteLine("{0}: END", GetHashCode()); 
        } 
    } 
 
    public void OnError(Exception error) 
    { 
        Console.WriteLine("{0}: {1}", GetHashCode(), error.Message); 
    } 
 
    public void OnNext(int value) 
    { 
        if (finished) 
            OnError(new Exception("This consumer finished its lifecycle")); 
 
        //the simple business logic is made by checking divider result 
        else if (value % validDivider == 0) 
            Console.WriteLine("{0}: {1} divisible by {2}", GetHashCode(), value, validDivider); 
    } 
} 

This observer consumes integer numeric messages, but it requires that the number is divisible by another one without producing any rest value. This logic, because of the encapsulation principle, is within the observer object. The observable interface, instead, only has the logic of the message sending of valid or error messages.

This filtering logic is made within the receiver itself. Although that is not something wrong, in more complex applications, specific filtering features are available in the publish-subscribe communication pipeline. In other words, another object will be available between observable (publisher) and observer (subscriber) that will act as a message filter.

Back to our numeric example, here we have the observable implementation made using an inner Task method that does the main job of parsing input text and sending messages. In addition, a cancellation token is available to handle the user cancellation request and an eventual observable dispose:

//Observable able to parse strings from the Console 
//and route numeric messages to all subscribers 
public class ConsoleIntegerProducer : IObservable<int>, IDisposable 
{ 
    //the subscriber list 
    private readonly List<IObserver<int>> subscriberList = new List<IObserver<int>>(); 
 
    //the cancellation token source for starting stopping 
    //inner observable working thread 
    private readonly CancellationTokenSource cancellationSource; 
    //the cancellation flag 
    private readonly CancellationToken cancellationToken; 
    //the running task that runs the inner running thread 
    private readonly Task workerTask; 
    public ConsoleIntegerProducer() 
    { 
        cancellationSource = new CancellationTokenSource(); 
        cancellationToken = cancellationSource.Token; 
        workerTask = Task.Factory.StartNew(OnInnerWorker, cancellationToken); 
    } 
             
    //add another observer to the subscriber list 
    public IDisposable Subscribe(IObserver<int> observer) 
    { 
        if (subscriberList.Contains(observer)) 
            throw new ArgumentException("The observer is already subscribed to this observable"); 
 
        Console.WriteLine("Subscribing for {0}", observer.GetHashCode()); 
        subscriberList.Add(observer); 
 
        return null; 
    } 
 
    //this code executes the observable infinite loop 
    //and routes messages to all observers on the valid 
    //message handler 
    private void OnInnerWorker() 
    { 
        while (!cancellationToken.IsCancellationRequested) 
        { 
            var input = Console.ReadLine(); 
            int value; 
 
            foreach (var observer in subscriberList) 
                if (string.IsNullOrEmpty(input)) 
                    break; 
                else if (input.Equals("EXIT")) 
                { 
                    cancellationSource.Cancel(); 
                    break; 
                } 
                else if (!int.TryParse(input, out value)) 
                    observer.OnError(new FormatException("Unable to parse given 
                    value")); 
                else 
                    observer.OnNext(value); 
        } 
        cancellationToken.ThrowIfCancellationRequested(); 
    } 
 
 
    //cancel main task and ack all observers 
    //by sending the OnCompleted message 
    public void Dispose() 
    { 
        if (!cancellationSource.IsCancellationRequested) 
        { 
            cancellationSource.Cancel(); 
            while (!workerTask.IsCanceled) 
                Thread.Sleep(100); 
        } 
 
        cancellationSource.Dispose(); 
        workerTask.Dispose(); 
 
        foreach (var observer in subscriberList) 
            observer.OnCompleted(); 
    } 
 
    //wait until the main task completes or went cancelled 
    public void Wait() 
    { 
        while (!(workerTask.IsCompleted || workerTask.IsCanceled)) 
            Thread.Sleep(100); 
    } 
} 

To complete the example, here there is the program Main:

static void Main(string[] args) 
{ 
    //this is the message observable responsible of producing messages 
    using (var observer = new ConsoleIntegerProducer()) 
    //those are the message observer that consume messages 
    using (var consumer1 = observer.Subscribe(new IntegerConsumer(2))) 
    using (var consumer2 = observer.Subscribe(new IntegerConsumer(3))) 
    using (var consumer3 = observer.Subscribe(new IntegerConsumer(5))) 
        observer.Wait(); 
 
    Console.WriteLine("END"); 
    Console.ReadLine(); 
} 

Tip

The cancellationToken.ThrowIfCancellationRequested may raise an exception in your Visual Studio when debugging. Simply go next by pressing F5 , or test such a code example without the attached debugger by starting the test with Ctrl + F5 instead of F5 alone.

The application simply creates an observable variable, which is able to parse user data. Then, register three observers specifying to each observer variable the required valid divider value.

Then, the observable variable will start reading user data from the console and valid or error messages will flow to all the observers. Each observer will apply its internal logic of showing the message when it divides for the related divider.

Here is the result of executing the application:

IObservable interface

Observables and observers in action

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.216.255.250