Filtering events

As said in the previous section, it is time to alter message flow.

The observable interface has the task of producing messages, while conversely observer consumes such messages. To create a message filter, we need to create an object that is both a publisher and a subscriber together.

The implementation must take into consideration the filtering need and the message routing to underlying observers that subscribe to the filter observable object instead of the main one.

Here's an implementation of the filter:

/// <summary> 
/// The filtering observable/observer 
/// </summary> 
public sealed class StringMessageFilter : IObservable<string>, IObserver<string>, IDisposable 
{ 
    private readonly string filter; 
    public StringMessageFilter(string filter) 
    { 
        this.filter = filter; 
    } 
 
    //the observer collection 
    private readonly List<IObserver<string>> observerList = new List<IObserver<string>>(); 
    public IDisposable Subscribe(IObserver<string> observer) 
    { 
        this.observerList.Add(observer); 
        return null; 
    } 
 
    //a simple implementation 
    //that disables message routing once 
    //the OnCompleted has been invoked 
    private bool hasCompleted = false; 
    public void OnCompleted() 
    { 
        hasCompleted = true; 
        foreach (var observer in observerList) 
            observer.OnCompleted(); 
    } 
 
    //routes error messages until not completed 
    public void OnError(Exception error) 
    { 
        if (!hasCompleted) 
            foreach (var observer in observerList) 
                observer.OnError(error); 
    } 
 
    //routes valid messages until not completed 
    public void OnNext(string value) 
    { 
        Console.WriteLine("Filtering {0}", value); 
 
        if (!hasCompleted && value.ToLowerInvariant().Contains(filter.ToLowerInvariant())) 
            foreach (var observer in observerList) 
                observer.OnNext(value); 
    } 
 
    public void Dispose() 
    { 
        OnCompleted(); 
    } 
} 

This filter can be used together with the example from the previous section that routes the FileSystemWatcher events of created files. This is the new program initialization:

static void Main(string[] args) 
{ 
    Console.WriteLine("Watching for new files"); 
    using (var publisher = new NewFileSavedMessagePublisher(@"[WRITE A PATH HERE]")) 
    using (var filter = new StringMessageFilter(".txt")) 
    { 
        //subscribe the filter to publisher messages 
        publisher.Subscribe(filter); 
        //subscribe the console subscriber to the filter 
        //instead that directly to the publisher 
        filter.Subscribe(new NewFileSavedMessageSubscriber()); 
 
        Console.WriteLine("Press RETURN to exit"); 
        Console.ReadLine(); 
    } 
} 

As we can see, this new implementation creates a new filter object that takes a parameter to verify valid filenames to flow to the underlying observers.

The filter subscribes to the main observable object, while the observer subscribes to the filter itself. It is like a chain where each chain link refers to the next one.

This is the output console of the running application:

Filtering events

The filtering observer in action

Although I made a copy of two files (a .png and a .txt file), we can see that only the text file reached the internal observer object, while the image file reached the OnNext of filter because the invalid against the filter argument never reached the internal observer.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.72.15