Sourcing events is the ability to obtain from a particular source where few useful events are usable in reactive programming.
If you are searching for the EventSourcing
pattern, take a look at
Chapter 7, Advanced Techniques
.
As already pointed out in the previous chapter, reactive programming is all about event message handling. Any event is a specific occurrence of some kind of handleable behavior of users or external systems. We can actually program event reactions in the most pleasant and productive way for reaching our software goals.
In the following example, we will see how to react to CLR events. In this specific case, we will handle filesystem events by using events from the System.IO.FileSystemWatcher
class that gives us the ability to react to the filesystem's file changes without the need of making useless and resource-consuming polling queries against the file system status.
Here's the observer and observable implementation:
public sealed class NewFileSavedMessagePublisher : IObservable<string>, IDisposable { private readonly FileSystemWatcher watcher; public NewFileSavedMessagePublisher(string path) { //creates a new file system event router this.watcher = new FileSystemWatcher(path); //register for handling File Created event this.watcher.Created += OnFileCreated; //enable event routing this.watcher.EnableRaisingEvents = true; } //signal all observers a new file arrived private void OnFileCreated(object sender, FileSystemEventArgs e) { foreach (var observer in subscriberList) observer.OnNext(e.FullPath); } //the subscriber list private readonly List<IObserver<string>> subscriberList = new List<IObserver<string>>(); public IDisposable Subscribe(IObserver<string> observer) { //register the new observer subscriberList.Add(observer); return null; } public void Dispose() { //disable file system event routing this.watcher.EnableRaisingEvents = false; //deregister from watcher event handler this.watcher.Created -= OnFileCreated; //dispose the watcher this.watcher.Dispose(); //signal all observers that job is done foreach (var observer in subscriberList) observer.OnCompleted(); } } /// <summary> /// A tremendously basic implementation /// </summary> public sealed class NewFileSavedMessageSubscriber : IObserver<string> { public void OnCompleted() { Console.WriteLine("-> END"); } public void OnError(Exception error) { Console.WriteLine("-> {0}", error.Message); } public void OnNext(string value) { Console.WriteLine("-> {0}", value); } }
The observer
interface simply gives us the ability to write text to the console. I think there is nothing to say about it.
On the other hand, the observable
interface makes the most of the job in this implementation.
The observable
interface creates the watcher
object and registers the right event handler to catch the wanted reactive events. It handles the life cycle of itself and the internal watcher
object. Then, it correctly sends the OnComplete
message to all the observers.
Here's the program's initialization:
static void Main(string[] args) { Console.WriteLine("Watching for new files"); using (var publisher = new NewFileSavedMessagePublisher(@"[WRITE A PATH HERE]")) using (var subscriber = publisher.Subscribe(new NewFileSavedMessageSubscriber())) { Console.WriteLine("Press RETURN to exit"); //wait for user RETURN Console.ReadLine(); } }
Any new file that arises in the folder will let route full FileName
to observer
. This is the result of a copy and paste of the same file three times:
-> [YOUR PATH]out - Copy.png -> [YOUR PATH]out - Copy (2).png -> [YOUR PATH]out - Copy (3).png
By using a single observable
interface and a single observer
interface, the power of reactive programming is not so evident. Let's begin with writing some intermediate object to change the message flow within the pipeline of our message pump made in a reactive way with filters, message correlator, and dividers.
18.189.180.43