The IObservable
interface, the opposite of the IObserver
interface, has the task of handling message production and the observer
subscription. It routes right messages to the OnNext
message handler and errors to the OnError
message handler. As its life cycle ends, it acknowledges all the observers on the OnComplete
message handler.
To create a valid reactive observable
interface, we must write something that is not locking against user input or any other external system input data. The observable
object acts as an infinite message generator, something like an infinite enumerable of messages; although in such cases, there is no enumeration.
Once a new message is available somehow, observer
routes it to all the subscribers.
In the following example, we will try creating a console application to ask the user for an integer number and then route such a number to all the subscribers. Otherwise, if the given input is not a number, an error will be routed to all the subscribers.
This is observer
similar to the one already seen in the previous example. Take a look at the following codes:
/// <summary> /// Consumes numeric values that divides without rest by a given number /// </summary> public class IntegerConsumer : IObserver<int> { readonly int validDivider; //the costructor asks for a divider public IntegerConsumer(int validDivider) { this.validDivider = validDivider; } private bool finished = false; public void OnCompleted() { if (finished) OnError(new Exception("This consumer already finished it's lifecycle")); else { finished = true; Console.WriteLine("{0}: END", GetHashCode()); } } public void OnError(Exception error) { Console.WriteLine("{0}: {1}", GetHashCode(), error.Message); } public void OnNext(int value) { if (finished) OnError(new Exception("This consumer finished its lifecycle")); //the simple business logic is made by checking divider result else if (value % validDivider == 0) Console.WriteLine("{0}: {1} divisible by {2}", GetHashCode(), value, validDivider); } }
This observer
consumes integer numeric messages, but it requires that the number is divisible by another one without producing any rest value. This logic, because of the encapsulation principle, is within the observer
object. The observable
interface, instead, only has the logic of the message sending of valid or error messages.
This filtering logic is made within the receiver itself. Although that is not something wrong, in more complex applications, specific filtering features are available in the publish-subscribe communication pipeline. In other words, another object will be available between observable
(publisher) and observer
(subscriber) that will act as a message filter.
Back to our numeric example, here we have the observable
implementation made using an inner Task
method that does the main job of parsing input text and sending messages. In addition, a cancellation token is available to handle the user cancellation
request and an eventual observable
dispose:
//Observable able to parse strings from the Console //and route numeric messages to all subscribers public class ConsoleIntegerProducer : IObservable<int>, IDisposable { //the subscriber list private readonly List<IObserver<int>> subscriberList = new List<IObserver<int>>(); //the cancellation token source for starting stopping //inner observable working thread private readonly CancellationTokenSource cancellationSource; //the cancellation flag private readonly CancellationToken cancellationToken; //the running task that runs the inner running thread private readonly Task workerTask; public ConsoleIntegerProducer() { cancellationSource = new CancellationTokenSource(); cancellationToken = cancellationSource.Token; workerTask = Task.Factory.StartNew(OnInnerWorker, cancellationToken); } //add another observer to the subscriber list public IDisposable Subscribe(IObserver<int> observer) { if (subscriberList.Contains(observer)) throw new ArgumentException("The observer is already subscribed to this observable"); Console.WriteLine("Subscribing for {0}", observer.GetHashCode()); subscriberList.Add(observer); return null; } //this code executes the observable infinite loop //and routes messages to all observers on the valid //message handler private void OnInnerWorker() { while (!cancellationToken.IsCancellationRequested) { var input = Console.ReadLine(); int value; foreach (var observer in subscriberList) if (string.IsNullOrEmpty(input)) break; else if (input.Equals("EXIT")) { cancellationSource.Cancel(); break; } else if (!int.TryParse(input, out value)) observer.OnError(new FormatException("Unable to parse given value")); else observer.OnNext(value); } cancellationToken.ThrowIfCancellationRequested(); } //cancel main task and ack all observers //by sending the OnCompleted message public void Dispose() { if (!cancellationSource.IsCancellationRequested) { cancellationSource.Cancel(); while (!workerTask.IsCanceled) Thread.Sleep(100); } cancellationSource.Dispose(); workerTask.Dispose(); foreach (var observer in subscriberList) observer.OnCompleted(); } //wait until the main task completes or went cancelled public void Wait() { while (!(workerTask.IsCompleted || workerTask.IsCanceled)) Thread.Sleep(100); } }
To complete the example, here there is the program Main
:
static void Main(string[] args) { //this is the message observable responsible of producing messages using (var observer = new ConsoleIntegerProducer()) //those are the message observer that consume messages using (var consumer1 = observer.Subscribe(new IntegerConsumer(2))) using (var consumer2 = observer.Subscribe(new IntegerConsumer(3))) using (var consumer3 = observer.Subscribe(new IntegerConsumer(5))) observer.Wait(); Console.WriteLine("END"); Console.ReadLine(); }
The application simply creates an observable
variable, which is able to parse user data. Then, register three observers specifying to each observer
variable the required valid divider value.
Then, the observable
variable will start reading user data from the console and valid or error messages will flow to all the observers. Each observer
will apply its internal logic of showing the message when it divides for the related divider.
Here is the result of executing the application:
3.129.42.243