What will happen if we want to stop a single observer from receiving messages from the observable
event source? If we change the program Main
from the preceding example to the following one, we could experience a wrong observer
life cycle design. Here's the code:
//this is the message observable responsible of producing messages using (var observer = new ConsoleIntegerProducer()) //those are the message observer that consume messages using (var consumer1 = observer.Subscribe(new IntegerConsumer(2))) using (var consumer2 = observer.Subscribe(new IntegerConsumer(3))) { using (var consumer3 = observer.Subscribe(new IntegerConsumer(5))) { //internal lifecycle } observer.Wait(); } Console.WriteLine("END"); Console.ReadLine();
Here is the result in the output console:
By using the using
construct
method, we should stop the life cycle of the consumer object. However, we do not, because in the previous example, the Subscribe
method of the observable
simply returns a NULL
object.
To create a valid observer, we must handle and design its life cycle management. This means that we must eventually handle the external disposing of the Subscribe
method's result by signaling the right observer
that his life cycle reached the end.
We have to create a Subscription
class to handle an eventual object disposing in the right reactive way by sending the message for the OnCompleted
event handler.
Here is a simple Subscription
class implementation:
/// <summary> /// Handle observer subscription lifecycle /// </summary> public sealed class Subscription<T> : IDisposable { private readonly IObserver<T> observer; public Subscription(IObserver<T> observer) { this.observer = observer; } //the event signalling that the observer has //completed its lifecycle public event EventHandler<IObserver<T>> OnCompleted; public void Dispose() { if (OnCompleted != null) OnCompleted(this, observer); observer.OnCompleted(); } }
The usage is within the observable
Subscribe
method. Here's an example:
//add another observer to the subscriber list public IDisposable Subscribe(IObserver<int> observer) { if (observerList.Contains(observer)) throw new ArgumentException("The observer is already subscribed to this observable"); Console.WriteLine("Subscribing for {0}", observer.GetHashCode()); observerList.Add(observer); //creates a new subscription for the given observer var subscription = new Subscription<int>(observer); //handle to the subscription lifecycle end event subscription.OnCompleted += OnObserverLifecycleEnd; return subscription; } void OnObserverLifecycleEnd(object sender, IObserver<int> e) { var subscription = sender as Subscription<int>; //remove the observer from the internal list within the observable observerList.Remove(e); //remove the handler from the subscription event //once already handled subscription.OnCompleted -= OnObserverLifecycleEnd; }
As shown, the preceding example creates a new Subscription<T>
object to handle this observer
life cycle with the IDisposable.Dispose
method.
Here is the result of such code edits against the full example available in the previous paragraph:
This time, an observer
ends its life cycle prematurely by disposing the subscription
object. This is visible by the first END
message. Later, only two observers remain available at the application ending; when the user asks for EXIT
, only two such observers end their life cycle by themselves rather than by the Subscription
disposing.
In real-world applications, observers often subscribe to observables and later unsubscribe by disposing the Subscription
token. This happens because we do not always want a reactive module to handle all the messages. In this case, this means that we have to handle the observer
life cycle by ourselves, as we already did in the previous examples, or we need to apply filters to choose which messages flow to which subscriber
, as shown in the later section Filtering events. Kindly consider that although filters make things easier, we will always have to handle the observer
life cycle.
3.15.231.194