As said in the previous section, it is time to alter message flow.
The observable
interface has the task of producing messages, while conversely observer
consumes such messages. To create a message filter, we need to create an object that is both a publisher and a subscriber together.
The implementation must take into consideration the filtering need and the message routing to underlying observers that subscribe to the filter observable
object instead of the main one.
Here's an implementation of the filter:
/// <summary> /// The filtering observable/observer /// </summary> public sealed class StringMessageFilter : IObservable<string>, IObserver<string>, IDisposable { private readonly string filter; public StringMessageFilter(string filter) { this.filter = filter; } //the observer collection private readonly List<IObserver<string>> observerList = new List<IObserver<string>>(); public IDisposable Subscribe(IObserver<string> observer) { this.observerList.Add(observer); return null; } //a simple implementation //that disables message routing once //the OnCompleted has been invoked private bool hasCompleted = false; public void OnCompleted() { hasCompleted = true; foreach (var observer in observerList) observer.OnCompleted(); } //routes error messages until not completed public void OnError(Exception error) { if (!hasCompleted) foreach (var observer in observerList) observer.OnError(error); } //routes valid messages until not completed public void OnNext(string value) { Console.WriteLine("Filtering {0}", value); if (!hasCompleted && value.ToLowerInvariant().Contains(filter.ToLowerInvariant())) foreach (var observer in observerList) observer.OnNext(value); } public void Dispose() { OnCompleted(); } }
This filter can be used together with the example from the previous section that routes the FileSystemWatcher
events of created files. This is the new program initialization:
static void Main(string[] args) { Console.WriteLine("Watching for new files"); using (var publisher = new NewFileSavedMessagePublisher(@"[WRITE A PATH HERE]")) using (var filter = new StringMessageFilter(".txt")) { //subscribe the filter to publisher messages publisher.Subscribe(filter); //subscribe the console subscriber to the filter //instead that directly to the publisher filter.Subscribe(new NewFileSavedMessageSubscriber()); Console.WriteLine("Press RETURN to exit"); Console.ReadLine(); } }
As we can see, this new implementation creates a new filter
object that takes a parameter to verify valid filenames to flow to the underlying observers.
The filter
subscribes to the main observable
object, while the observer
subscribes to the filter
itself. It is like a chain where each chain link refers to the next one.
This is the output console of the running application:
Although I made a copy of two files (a .png
and a .txt
file), we can see that only the text file reached the internal observer
object, while the image file reached the OnNext
of filter
because the invalid against the filter
argument never reached the internal observer
.
18.227.72.15