Subscription life cycle

What will happen if we want to stop a single observer from receiving messages from the observable event source? If we change the program Main from the preceding example to the following one, we could experience a wrong observer life cycle design. Here's the code:

//this is the message observable responsible of producing messages 
using (var observer = new ConsoleIntegerProducer()) 
//those are the message observer that consume messages 
using (var consumer1 = observer.Subscribe(new IntegerConsumer(2))) 
using (var consumer2 = observer.Subscribe(new IntegerConsumer(3))) 
{ 
    using (var consumer3 = observer.Subscribe(new IntegerConsumer(5))) 
    { 
        //internal lifecycle 
    } 
 
    observer.Wait(); 
} 
 
Console.WriteLine("END"); 
Console.ReadLine(); 

Here is the result in the output console:

Subscription life cycle

The third observer unable to catch value messages

By using the using construct method, we should stop the life cycle of the consumer object. However, we do not, because in the previous example, the Subscribe method of the observable simply returns a NULL object.

To create a valid observer, we must handle and design its life cycle management. This means that we must eventually handle the external disposing of the Subscribe method's result by signaling the right observer that his life cycle reached the end.

We have to create a Subscription class to handle an eventual object disposing in the right reactive way by sending the message for the OnCompleted event handler.

Here is a simple Subscription class implementation:

/// <summary> 
/// Handle observer subscription lifecycle 
/// </summary> 
public sealed class Subscription<T> : IDisposable 
{ 
    private readonly IObserver<T> observer; 
    public Subscription(IObserver<T> observer) 
    { 
        this.observer = observer; 
    } 
 
    //the event signalling that the observer has 
    //completed its lifecycle 
    public event EventHandler<IObserver<T>> OnCompleted; 
 
    public void Dispose() 
    { 
        if (OnCompleted != null) 
            OnCompleted(this, observer); 
 
        observer.OnCompleted(); 
    } 
} 

The usage is within the observable Subscribe method. Here's an example:

//add another observer to the subscriber list 
public IDisposable Subscribe(IObserver<int> observer) 
{ 
    if (observerList.Contains(observer)) 
        throw new ArgumentException("The observer is already subscribed to this observable"); 
 
    Console.WriteLine("Subscribing for {0}", observer.GetHashCode()); 
    observerList.Add(observer); 
 
    //creates a new subscription for the given observer 
var subscription = new Subscription<int>(observer); 
//handle to the subscription lifecycle end event 
    subscription.OnCompleted += OnObserverLifecycleEnd; 
    return subscription; 
} 
 
void OnObserverLifecycleEnd(object sender, IObserver<int> e) 
{ 
    var subscription = sender as Subscription<int>; 
    //remove the observer from the internal list within the observable 
    observerList.Remove(e); 
    //remove the handler from the subscription event 
    //once already handled 
    subscription.OnCompleted -= OnObserverLifecycleEnd; 
} 

As shown, the preceding example creates a new Subscription<T> object to handle this observer life cycle with the IDisposable.Dispose method.

Here is the result of such code edits against the full example available in the previous paragraph:

Subscription life cycle

The observer will end their life as we dispose their life cycle tokens

This time, an observer ends its life cycle prematurely by disposing the subscription object. This is visible by the first END message. Later, only two observers remain available at the application ending; when the user asks for EXIT, only two such observers end their life cycle by themselves rather than by the Subscription disposing.

In real-world applications, observers often subscribe to observables and later unsubscribe by disposing the Subscription token. This happens because we do not always want a reactive module to handle all the messages. In this case, this means that we have to handle the observer life cycle by ourselves, as we already did in the previous examples, or we need to apply filters to choose which messages flow to which subscriber, as shown in the later section Filtering events. Kindly consider that although filters make things easier, we will always have to handle the observer life cycle.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.116.14.245