Correlating events

Sometimes, especially when dealing with integration scenarios, there is a need for correlating multiple events that don't always came together. This is the case of a header file that came together with multiple body files.

In reactive programming, correlating events means correlating multiple observable messages into a single message that is the result of two or more original messages. Such messages must be somehow correlated to a value (an ID, serial, or metadata) that defines that such initial messages belong to the same correlation set.

Useful features in real-world correlators are the ability to specify a timeout (that may be infinite too) in the correlation waiting logic and the ability to specify a correlation message count (infinite too).

Here's a correlator implementation made for the previous example based on the FileSystemWatcher class:

public sealed class FileNameMessageCorrelator : IObservable<string>, IObserver<string>, IDisposable 
{ 
    private readonly Func<string, string> correlationKeyExtractor; 
    public FileNameMessageCorrelator(Func<string, string> correlationKeyExtractor) 
    { 
        this.correlationKeyExtractor = correlationKeyExtractor; 
    } 
 
    //the observer collection 
    private readonly List<IObserver<string>> observerList = new List<IObserver<string>>(); 
    public IDisposable Subscribe(IObserver<string> observer) 
    { 
        this.observerList.Add(observer); 
        return null; 
    } 
 
    private bool hasCompleted = false; 
    public void OnCompleted() 
    { 
        hasCompleted = true; 
        foreach (var observer in observerList) 
            observer.OnCompleted(); 
    } 
 
    //routes error messages until not completed 
    public void OnError(Exception error) 
    { 
        if (!hasCompleted) 
            foreach (var observer in observerList) 
                observer.OnError(error); 
    } 

Let's pause. Up to this row, we simply created the reactive structure of the FileNameMessageCorrelator class by implementing the two main interfaces. Here is the core implementation that correlates messages:

//the container of correlations able to contain 
//multiple strings per each key 
private readonly NameValueCollection correlations = new NameValueCollection(); 
 
//routes valid messages until not completed 
public void OnNext(string value) 
{ 
    if (hasCompleted) return; 
 
    //check if subscriber has completed 
    Console.WriteLine("Parsing message: {0}", value); 
 
    //try extracting the correlation ID 
    var correlationID = correlationKeyExtractor(value); 
 
    //check if the correlation is available 
    if (correlationID == null) return; 
 
    //append the new file name to the correlation state 
    correlations.Add(correlationID, value); 
 
    //in this example we will consider always 
    //correlations of two items 
    if (correlations.GetValues(correlationID).Count() == 2) 
    { 
        //once the correlation is complete 
        //read the two files and push the 
        //two contents altogether to the 
        //observers 
 
        var fileData = correlations.GetValues(correlationID) 
            //route messages to the ReadAllText method 
            .Select(File.ReadAllText) 
            //materialize the query 
            .ToArray(); 
 
        var newValue = string.Join("|", fileData); 
 
        foreach (var observer in observerList) 
            observer.OnNext(newValue); 
 
        correlations.Remove(correlationID); 
    } 
} 

This correlator class accepts a correlation function as a constructor parameter. This function is later used to evaluate correlationID when a new filename variable flows within the OnNext method.

Once the function returns valid correlationID, such IDs will be used as key for NameValueCollection, a specialized string collection to store multiple values per key. When there are two values for the same key, correlation is ready to flow out to the underlying observers by reading file data and joining such data into a single string message.

Here's the application's initialization:

static void Main(string[] args) 
{ 
    using (var publisher = new NewFileSavedMessagePublisher(@"[WRITE A PATH HERE]")) 
    //creates a new correlator by specifying the correlation key 
    //extraction function made with a Regular expression that 
    //extract a file ID similar to FILEID0001 
    using (var correlator = new FileNameMessageCorrelator(ExtractCorrelationKey)) 
    { 
        //subscribe the correlator to publisher messages 
        publisher.Subscribe(correlator); 
 
        //subscribe the console subscriber to the correlator 
        //instead that directly to the publisher 
        correlator.Subscribe(new NewFileSavedMessageSubscriber()); 
 
        //wait for user RETURN 
        Console.ReadLine(); 
    } 
} 
 
private static string ExtractCorrelationKey(string arg) 
{ 
    var match = Regex.Match(arg, "(FILEID\d{4})"); 
    if (match.Success) 
        return match.Captures[0].Value; 
    else 
        return null; 
} 

The initialization is almost the same as the filtering example seen in the previous section. The biggest difference is that the correlator object, instead of a string filter variable, accepts a function that analyses the incoming filename and produces the eventually available correlationID variable.

I prepared two files with the same ID in the filename variable. Here's the console output of the running example:

Correlating events

Two files correlated by their name

As can be seen, correlator fulfilled its job by joining the two file's data into a single message regardless of the order in which the two files were stored in the filesystem.

These examples regarding the filtering and correlation of messages should show you that we can do anything with received messages: we can put a message in standby until a correlated message comes, we can join multiple messages into one, we can produce multiple times the same message, and so on.

This programming style opens the programmer's mind to a lot of new application designs and possibilities.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.14.252.56