Sometimes, especially when dealing with integration scenarios, there is a need for correlating multiple events that don't always came together. This is the case of a header file that came together with multiple body files.
In reactive programming, correlating events means correlating multiple observable
messages into a single message that is the result of two or more original messages. Such messages must be somehow correlated to a value (an ID
, serial
, or metadata) that defines that such initial messages belong to the same correlation set.
Useful features in real-world correlators are the ability to specify a timeout (that may be infinite too) in the correlation waiting logic and the ability to specify a correlation message count (infinite too).
Here's a correlator
implementation made for the previous example based on the FileSystemWatcher
class:
public sealed class FileNameMessageCorrelator : IObservable<string>, IObserver<string>, IDisposable { private readonly Func<string, string> correlationKeyExtractor; public FileNameMessageCorrelator(Func<string, string> correlationKeyExtractor) { this.correlationKeyExtractor = correlationKeyExtractor; } //the observer collection private readonly List<IObserver<string>> observerList = new List<IObserver<string>>(); public IDisposable Subscribe(IObserver<string> observer) { this.observerList.Add(observer); return null; } private bool hasCompleted = false; public void OnCompleted() { hasCompleted = true; foreach (var observer in observerList) observer.OnCompleted(); } //routes error messages until not completed public void OnError(Exception error) { if (!hasCompleted) foreach (var observer in observerList) observer.OnError(error); }
Let's pause. Up to this row, we simply created the reactive structure of the FileNameMessageCorrelator
class by implementing the two main interfaces. Here is the core implementation that correlates messages:
//the container of correlations able to contain //multiple strings per each key private readonly NameValueCollection correlations = new NameValueCollection(); //routes valid messages until not completed public void OnNext(string value) { if (hasCompleted) return; //check if subscriber has completed Console.WriteLine("Parsing message: {0}", value); //try extracting the correlation ID var correlationID = correlationKeyExtractor(value); //check if the correlation is available if (correlationID == null) return; //append the new file name to the correlation state correlations.Add(correlationID, value); //in this example we will consider always //correlations of two items if (correlations.GetValues(correlationID).Count() == 2) { //once the correlation is complete //read the two files and push the //two contents altogether to the //observers var fileData = correlations.GetValues(correlationID) //route messages to the ReadAllText method .Select(File.ReadAllText) //materialize the query .ToArray(); var newValue = string.Join("|", fileData); foreach (var observer in observerList) observer.OnNext(newValue); correlations.Remove(correlationID); } }
This correlator
class accepts a correlation
function as a constructor parameter. This function is later used to evaluate correlationID
when a new filename
variable flows within the OnNext
method.
Once the function returns valid correlationID
, such IDs will be used as key for NameValueCollection
, a specialized string collection to store multiple values per key. When there are two values for the same key, correlation
is ready to flow out to the underlying observers by reading file data and joining such data into a single string message.
Here's the application's initialization:
static void Main(string[] args) { using (var publisher = new NewFileSavedMessagePublisher(@"[WRITE A PATH HERE]")) //creates a new correlator by specifying the correlation key //extraction function made with a Regular expression that //extract a file ID similar to FILEID0001 using (var correlator = new FileNameMessageCorrelator(ExtractCorrelationKey)) { //subscribe the correlator to publisher messages publisher.Subscribe(correlator); //subscribe the console subscriber to the correlator //instead that directly to the publisher correlator.Subscribe(new NewFileSavedMessageSubscriber()); //wait for user RETURN Console.ReadLine(); } } private static string ExtractCorrelationKey(string arg) { var match = Regex.Match(arg, "(FILEID\d{4})"); if (match.Success) return match.Captures[0].Value; else return null; }
The initialization is almost the same as the filtering example seen in the previous section. The biggest difference is that the correlator
object, instead of a string filter
variable, accepts a function that analyses the incoming filename and produces the eventually available correlationID
variable.
I prepared two files with the same ID in the filename
variable. Here's the console output of the running example:
As can be seen, correlator
fulfilled its job by joining the two file's data into a single message regardless of the order in which the two files were stored in the filesystem.
These examples regarding the filtering and correlation of messages should show you that we can do anything with received messages: we can put a message in standby until a correlated message comes, we can join multiple messages into one, we can produce multiple times the same message, and so on.
This programming style opens the programmer's mind to a lot of new application designs and possibilities.
3.14.252.56