Sourcing from a finite collection is quite useless with regard to reactive programming. However, specific enumerable collections are perfect for reactive uses. These collections are the changeable collections that support collection change notifications by implementing the INotifyCollectionChanged
(System.Collections.Specialized
) interface like the ObservableCollection
(System.Collections.ObjectModel
) class and any infinite collection that supports the enumerator pattern with the use of the yield
keyword.
The ObservableCollection<T>
class gives us the ability to understand, in an event-based way, any change that occurs against the collection content. Kindly consider that changes regarding collection child properties are outside of the collection scope. This means that we are notified only for collection changes like the one produced from the Add
or Remove
methods. Changes within a single item do not produce an alteration of the collection size, thus, they are not notified at all.
Here's a generic (non reactive) example:
static void Main(string[] args) { //the observable collection var collection = new ObservableCollection<string>(); //register a handler to catch collection changes collection.CollectionChanged += OnCollectionChanged; collection.Add("ciao"); collection.Add("hahahah"); collection.Insert(0, "new first line"); collection.RemoveAt(0); Console.WriteLine("Press RETURN to EXIT"); Console.ReadLine(); } private static void OnCollectionChanged(object sender, NotifyCollectionChangedEventArgs e) { var collection = sender as ObservableCollection<string>; if (e.NewStartingIndex >= 0) //adding new items Console.WriteLine("-> {0} {1}", e.Action, collection[e.NewStartingIndex]); else //removing items Console.WriteLine("-> {0} at {1}", e.Action, e.OldStartingIndex); }
As shown, collection
notifies all the adding operations, giving the ability to catch the new message. The Insert
method signals an Add
operation; although with the Insert
method, we can specify the index and the value will be available within collection
. Obviously, the parameter containing the index value (e.NewStartingIndex
) contains the new index according to the right operation. However, the Remove
operation, although notifying the removed element index, cannot give us the ability to read the original message before the removal, because the event triggers after the remove operation has already occurred.
In a real-world reactive application, the most interesting operation against ObservableCollection
is the Add
operation. Here's an example (console observer omitted for better readability):
class Program { static void Main(string[] args) { //the observable collection var collection = new ObservableCollection<string>(); using (var observable = new NotifiableCollectionObservable(collection)) using (var observer = observable.Subscribe(new ConsoleStringObserver())) { collection.Add("ciao"); collection.Add("hahahah"); collection.Insert(0, "new first line"); collection.RemoveAt(0); Console.WriteLine("Press RETURN to EXIT"); Console.ReadLine(); } } public sealed class NotifiableCollectionObservable : IObservable<string>, IDisposable { private readonly ObservableCollection<string> collection; public NotifiableCollectionObservable(ObservableCollection<string> collection) { this.collection = collection; this.collection.CollectionChanged += collection_CollectionChanged; } private readonly List<IObserver<string>> observerList = new List<IObserver<string>>(); public IDisposable Subscribe(IObserver<string> observer) { observerList.Add(observer); //subscription lifecycle missing //for readability purpose return null; } public void Dispose() { this.collection.CollectionChanged -= collection_CollectionChanged; foreach (var observer in observerList) observer.OnCompleted(); } } }
The result is the same as the previous example of ObservableCollection
without the reactive objects. The only difference is that observable
only routes messages when the Action
values add.
Our last example is regarding sourcing events from an infinite collection
method.
In C#, it is possible to implement the enumerator pattern by signaling each object to enumerate per time, thanks to the yield
keyword. Here's an example:
static void Main(string[] args) { foreach (var value in EnumerateValuesFromSomewhere()) Console.WriteLine(value); } static IEnumerable<string> EnumerateValuesFromSomewhere() { var random = new Random(DateTime.Now.GetHashCode()); while (true) //forever { //returns a random integer number as string yield return random.Next().ToString(); //some throttling time Thread.Sleep(100); } }
This implementation is powerful because it doesn't materialize all the values into the memory. It simply signals that a new object is available to the enumerator that the foreach
structure internally uses itself. The result is forever writing numbers onto the output console.
Somehow, this behavior is useful for reactive use, because it doesn't create a useless state like a temporary array, list, or generic collection. It simply signals new items available to the enumerable.
Here's an example:
public sealed class EnumerableObservable : IObservable<string>, IDisposable { private readonly IEnumerable<string> enumerable; public EnumerableObservable(IEnumerable<string> enumerable) { this.enumerable = enumerable; this.cancellationSource = new CancellationTokenSource(); this.cancellationToken = cancellationSource.Token; this.workerTask = Task.Factory.StartNew(() => { foreach (var value in this.enumerable) { //if task cancellation triggers, raise the proper exception //to stop task execution cancellationToken.ThrowIfCancellationRequested(); foreach (var observer in observerList) observer.OnNext(value); } }, this.cancellationToken); } //the cancellation token source for starting stopping //inner observable working thread private readonly CancellationTokenSource cancellationSource; //the cancellation flag private readonly CancellationToken cancellationToken; //the running task that runs the inner running thread private readonly Task workerTask; //the observer list private readonly List<IObserver<string>> observerList = new List<IObserver<string>>(); public IDisposable Subscribe(IObserver<string> observer) { observerList.Add(observer); //subscription lifecycle missing //for readability purpose return null; } public void Dispose() { //trigger task cancellation //and wait for acknoledge if (!cancellationSource.IsCancellationRequested) { cancellationSource.Cancel(); while (!workerTask.IsCanceled) Thread.Sleep(100); } cancellationSource.Dispose(); workerTask.Dispose(); foreach (var observer in observerList) observer.OnCompleted(); } }
This is the code of the program startup with the infinite enumerable generation:
class Program { static void Main(string[] args) { //we create a variable containing the enumerable //this does not trigger item retrieval //so the enumerator does not begin flowing datas var enumerable = EnumerateValuesFromSomewhere(); using (var observable = new EnumerableObservable(enumerable)) using (var observer = observable.Subscribe(new ConsoleStringObserver())) { //wait for 2 seconds than exit Thread.Sleep(2000); } Console.WriteLine("Press RETURN to EXIT"); Console.ReadLine(); } static IEnumerable<string> EnumerateValuesFromSomewhere() { var random = new Random(DateTime.Now.GetHashCode()); while (true) //forever { //returns a random integer number as string yield return random.Next().ToString(); //some throttling time Thread.Sleep(100); } } }
As against the last examples, here we have the use of the Task
class. The observable
uses the enumerable within the asynchronous Task
method to give the programmer the ability to stop the execution of the whole operation by simply exiting the using scope or by manually invoking the Dispose
method.
This example shows a tremendously powerful feature: the ability to yield values without having to source them from a concrete (finite) array or collection by simply implementing the enumerator pattern. Although few are used, the yield
operator gives the ability to create complex applications simply by pushing messages between methods. The more methods we create that cross send messages to each other, the more complex business logics the application can handle.
Consider the ability to catch all such messages with observables, and you have a little idea about how powerful reactive programming can be for a developer.
18.116.60.62