Sourcing events

Sourcing events is the ability to obtain from a particular source where few useful events are usable in reactive programming.

Tip

If you are searching for the EventSourcing pattern, take a look at Chapter 7, Advanced Techniques .

As already pointed out in the previous chapter, reactive programming is all about event message handling. Any event is a specific occurrence of some kind of handleable behavior of users or external systems. We can actually program event reactions in the most pleasant and productive way for reaching our software goals.

In the following example, we will see how to react to CLR events. In this specific case, we will handle filesystem events by using events from the System.IO.FileSystemWatcher class that gives us the ability to react to the filesystem's file changes without the need of making useless and resource-consuming polling queries against the file system status.

Here's the observer and observable implementation:

public sealed class NewFileSavedMessagePublisher : IObservable<string>, IDisposable 
{ 
    private readonly FileSystemWatcher watcher; 
    public NewFileSavedMessagePublisher(string path) 
    { 
        //creates a new file system event router 
        this.watcher = new FileSystemWatcher(path); 
        //register for handling File Created event 
        this.watcher.Created += OnFileCreated; 
        //enable event routing 
        this.watcher.EnableRaisingEvents = true; 
    } 
 
    //signal all observers a new file arrived 
    private void OnFileCreated(object sender, FileSystemEventArgs e) 
    { 
        foreach (var observer in subscriberList) 
            observer.OnNext(e.FullPath); 
    } 
 
    //the subscriber list 
    private readonly List<IObserver<string>> subscriberList = new List<IObserver<string>>(); 
 
    public IDisposable Subscribe(IObserver<string> observer) 
    { 
        //register the new observer 
        subscriberList.Add(observer); 
 
        return null; 
    } 
 
    public void Dispose() 
    { 
        //disable file system event routing 
        this.watcher.EnableRaisingEvents = false; 
        //deregister from watcher event handler 
        this.watcher.Created -= OnFileCreated; 
        //dispose the watcher 
        this.watcher.Dispose(); 
 
        //signal all observers that job is done 
        foreach (var observer in subscriberList) 
            observer.OnCompleted(); 
    } 
} 
 
/// <summary> 
/// A tremendously basic implementation 
/// </summary> 
public sealed class NewFileSavedMessageSubscriber : IObserver<string> 
{ 
    public void OnCompleted() 
    { 
        Console.WriteLine("-> END"); 
    } 
 
    public void OnError(Exception error) 
    { 
        Console.WriteLine("-> {0}", error.Message); 
    } 
 
    public void OnNext(string value) 
    { 
        Console.WriteLine("-> {0}", value); 
    } 
} 

The observer interface simply gives us the ability to write text to the console. I think there is nothing to say about it.

On the other hand, the observable interface makes the most of the job in this implementation.

The observable interface creates the watcher object and registers the right event handler to catch the wanted reactive events. It handles the life cycle of itself and the internal watcher object. Then, it correctly sends the OnComplete message to all the observers.

Here's the program's initialization:

static void Main(string[] args) 
{ 
    Console.WriteLine("Watching for new files"); 
    using (var publisher = new NewFileSavedMessagePublisher(@"[WRITE A PATH
    HERE]")) 
    using (var subscriber = publisher.Subscribe(new NewFileSavedMessageSubscriber())) 
    { 
        Console.WriteLine("Press RETURN to exit"); 
        //wait for user RETURN 
        Console.ReadLine(); 
    } 
} 

Any new file that arises in the folder will let route full FileName to observer. This is the result of a copy and paste of the same file three times:

-> [YOUR PATH]out - Copy.png 
-> [YOUR PATH]out - Copy (2).png 
-> [YOUR PATH]out - Copy (3).png 

By using a single observable interface and a single observer interface, the power of reactive programming is not so evident. Let's begin with writing some intermediate object to change the message flow within the pipeline of our message pump made in a reactive way with filters, message correlator, and dividers.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.90.182