Sourcing from CLR enumerables

Sourcing from a finite collection is quite useless with regard to reactive programming. However, specific enumerable collections are perfect for reactive uses. These collections are the changeable collections that support collection change notifications by implementing the INotifyCollectionChanged(System.Collections.Specialized) interface like the ObservableCollection(System.Collections.ObjectModel) class and any infinite collection that supports the enumerator pattern with the use of the yield keyword.

Changeable collections

The ObservableCollection<T> class gives us the ability to understand, in an event-based way, any change that occurs against the collection content. Kindly consider that changes regarding collection child properties are outside of the collection scope. This means that we are notified only for collection changes like the one produced from the Add or Remove methods. Changes within a single item do not produce an alteration of the collection size, thus, they are not notified at all.

Here's a generic (non reactive) example:

static void Main(string[] args) 
{ 
    //the observable collection 
    var collection = new ObservableCollection<string>(); 
    //register a handler to catch collection changes 
    collection.CollectionChanged += OnCollectionChanged; 
 
    collection.Add("ciao"); 
    collection.Add("hahahah"); 
 
    collection.Insert(0, "new first line"); 
    collection.RemoveAt(0); 
 
    Console.WriteLine("Press RETURN to EXIT"); 
    Console.ReadLine(); 
} 
 
private static void OnCollectionChanged(object sender, NotifyCollectionChangedEventArgs e) 
{ 
    var collection = sender as ObservableCollection<string>; 
 
    if (e.NewStartingIndex >= 0) //adding new items 
        Console.WriteLine("-> {0} {1}", e.Action, collection[e.NewStartingIndex]); 
    else //removing items 
        Console.WriteLine("-> {0} at {1}", e.Action, e.OldStartingIndex); 
} 

As shown, collection notifies all the adding operations, giving the ability to catch the new message. The Insert method signals an Add operation; although with the Insert method, we can specify the index and the value will be available within collection. Obviously, the parameter containing the index value (e.NewStartingIndex) contains the new index according to the right operation. However, the Remove operation, although notifying the removed element index, cannot give us the ability to read the original message before the removal, because the event triggers after the remove operation has already occurred.

In a real-world reactive application, the most interesting operation against ObservableCollection is the Add operation. Here's an example (console observer omitted for better readability):

class Program 
{ 
    static void Main(string[] args) 
    { 
        //the observable collection 
        var collection = new ObservableCollection<string>(); 
 
        using (var observable = new NotifiableCollectionObservable(collection)) 
        using (var observer = observable.Subscribe(new ConsoleStringObserver())) 
        { 
            collection.Add("ciao"); 
            collection.Add("hahahah"); 
 
            collection.Insert(0, "new first line"); 
            collection.RemoveAt(0); 
 
            Console.WriteLine("Press RETURN to EXIT"); 
            Console.ReadLine(); 
        } 
    } 
 
public sealed class NotifiableCollectionObservable : IObservable<string>, IDisposable 
{ 
    private readonly ObservableCollection<string> collection; 
    public NotifiableCollectionObservable(ObservableCollection<string> collection) 
    { 
        this.collection = collection; 
        this.collection.CollectionChanged += collection_CollectionChanged; 
    } 
 
    private readonly List<IObserver<string>> observerList = new List<IObserver<string>>(); 
    public IDisposable Subscribe(IObserver<string> observer) 
    { 
        observerList.Add(observer); 
 
        //subscription lifecycle missing 
        //for readability purpose 
        return null; 
    } 
 
    public void Dispose() 
    { 
        this.collection.CollectionChanged -= collection_CollectionChanged; 
 
        foreach (var observer in observerList) 
            observer.OnCompleted(); 
    } 
} 
} 

The result is the same as the previous example of ObservableCollection without the reactive objects. The only difference is that observable only routes messages when the Action values add.

Changeable collections

The ObservableCollection signaling its content changes

Infinite collections

Our last example is regarding sourcing events from an infinite collection method.

In C#, it is possible to implement the enumerator pattern by signaling each object to enumerate per time, thanks to the yield keyword. Here's an example:

static void Main(string[] args) 
{ 
    foreach (var value in EnumerateValuesFromSomewhere()) 
        Console.WriteLine(value); 
} 
 
static IEnumerable<string> EnumerateValuesFromSomewhere() 
{ 
    var random = new Random(DateTime.Now.GetHashCode()); 
    while (true) //forever 
    { 
        //returns a random integer number as string 
        yield return random.Next().ToString(); 
        //some throttling time 
        Thread.Sleep(100); 
    } 
} 

This implementation is powerful because it doesn't materialize all the values into the memory. It simply signals that a new object is available to the enumerator that the foreach structure internally uses itself. The result is forever writing numbers onto the output console.

Somehow, this behavior is useful for reactive use, because it doesn't create a useless state like a temporary array, list, or generic collection. It simply signals new items available to the enumerable.

Here's an example:

    public sealed class EnumerableObservable : IObservable<string>, IDisposable 
    { 
        private readonly IEnumerable<string> enumerable; 
        public EnumerableObservable(IEnumerable<string> enumerable) 
        { 
            this.enumerable = enumerable; 
            this.cancellationSource = new CancellationTokenSource(); 
            this.cancellationToken = cancellationSource.Token; 
            this.workerTask = Task.Factory.StartNew(() => 
                { 
                    foreach (var value in this.enumerable) 
                    { 
                        //if task cancellation triggers, raise the proper exception 
                        //to stop task execution 
                        cancellationToken.ThrowIfCancellationRequested(); 
 
                        foreach (var observer in observerList) 
                            observer.OnNext(value); 
                    } 
                }, this.cancellationToken); 
        } 
 
        //the cancellation token source for starting stopping 
        //inner observable working thread 
        private readonly CancellationTokenSource cancellationSource; 
        //the cancellation flag 
        private readonly CancellationToken cancellationToken; 
        //the running task that runs the inner running thread 
        private readonly Task workerTask; 
        //the observer list 
        private readonly List<IObserver<string>> observerList = new List<IObserver<string>>(); 
        public IDisposable Subscribe(IObserver<string> observer) 
        { 
            observerList.Add(observer); 
 
            //subscription lifecycle missing 
            //for readability purpose 
            return null; 
        } 
 
        public void Dispose() 
        { 
            //trigger task cancellation 
            //and wait for acknoledge 
            if (!cancellationSource.IsCancellationRequested) 
            { 
                cancellationSource.Cancel(); 
                while (!workerTask.IsCanceled) 
                    Thread.Sleep(100); 
            } 
 
            cancellationSource.Dispose(); 
            workerTask.Dispose(); 
 
            foreach (var observer in observerList) 
                observer.OnCompleted(); 
        } 
    } 

This is the code of the program startup with the infinite enumerable generation:

    class Program 
    { 
        static void Main(string[] args) 
        { 
            //we create a variable containing the enumerable 
            //this does not trigger item retrieval 
            //so the enumerator does not begin flowing datas 
            var enumerable = EnumerateValuesFromSomewhere(); 
 
            using (var observable = new EnumerableObservable(enumerable)) 
            using (var observer = observable.Subscribe(new ConsoleStringObserver())) 
            { 
                //wait for 2 seconds than exit 
                Thread.Sleep(2000); 
            } 
 
            Console.WriteLine("Press RETURN to EXIT"); 
            Console.ReadLine(); 
        } 
 
        static IEnumerable<string> EnumerateValuesFromSomewhere() 
        { 
            var random = new Random(DateTime.Now.GetHashCode()); 
            while (true) //forever 
            { 
                //returns a random integer number as string 
                yield return random.Next().ToString(); 
                //some throttling time 
                Thread.Sleep(100); 
            } 
        } 
    } 

As against the last examples, here we have the use of the Task class. The observable uses the enumerable within the asynchronous Task method to give the programmer the ability to stop the execution of the whole operation by simply exiting the using scope or by manually invoking the Dispose method.

This example shows a tremendously powerful feature: the ability to yield values without having to source them from a concrete (finite) array or collection by simply implementing the enumerator pattern. Although few are used, the yield operator gives the ability to create complex applications simply by pushing messages between methods. The more methods we create that cross send messages to each other, the more complex business logics the application can handle.

Consider the ability to catch all such messages with observables, and you have a little idea about how powerful reactive programming can be for a developer.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.14.93