Sequence partitioning

Partitioning is the ability to split data into multiple strips.

When dealing with reactive sequences, partitioning means splitting a sourcing sequence into multiple subsequences. The goal may be to maintain message flow consistency when dealing with multiple parallel sequences. In this case, although we may prefer working with data from all the sourcing sequences in a single sequence chain for simplicity and maintainability, at a time, we will need to split the messages from their original flow by grouping them by one (or more) properties.

In reactive programming, we never partition for performance needs (such as parallelizing processing) because this is achieved by default using the reactive framework itself if we use the proper overall design (refer to Chapter 6, CLR Integration and Scheduling, for a more in-depth the argument).

GroupBy

The king of the partitioning functions is the GroupBy one. This is the same within Rx, thus we have the GroupBy extension method that will produce a grouping observable sequence (IGroupedObservable<T>) of small sequences to deal with a single group item per time.

These sequences will flow out the grouping sequence in a lazy way. Once a new message, requiring a new subsequence, flows out the sourcing sequence, the grouping sequence will flow out a new grouping sequence. This is why we need a two-level subscribing method, because at the first level, we register the new subsequence, and at the second one we register the message observer as usual for nongrouped sequences.

Here's an example:

//a sourcing sequence 
var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(id => DateTime.UtcNow); 
 
//sequence partitioning by seconds 
var partitions = sourcingSequence.GroupBy(x => Math.Floor(x.Second / 10d)); 
 
//register the partition per group key 
partitions.Subscribe(partition => 
    { 
        Console.WriteLine("Registering observer for: {0}", partition.Key); 
 
        //register the observer per partition 
        partition.Subscribe(value => 
            { 
                Console.WriteLine("partition {0}: {1}", partition.Key, value); 
            }); 
    }); 
 
Console.ReadLine(); 

Tip

Nested subscriptions, although useful for demonstration purposes, are something we should avoid in the real world because they may cause difficulties in maintainability and debugging, and (most importantly) because they reduce our control on the overall flow because of the nested delegates we need to write to handle nested sequences. Although we will see other nested sequences here, the suggestion when dealing with real-world coding is to write something like the following example of the GroupBy operator. The only difference is that, to avoid nesting, we need to flatten the multiple stripes into a single sequence of new messages containing the original value and the group key. Although we're still logically grouping, we don't actually need multiple real subsequences: //register the partition per group key //without nested sequences partitions //transform inner groups into new objects //containing the key and the value altogether .SelectMany(group => group.Select(x => new { key = group.Key, value = x })) .Subscribe(msg => Console.WriteLine("partition {0}: {1}", msg.key, msg.value));

Aggregate

The Aggregate factory method creates a new sequence that will interact with each source message, returning a single output message that will produce any operation we want. In the following example, there is an accumulator function that simply adds each message value to the next:

//a sourcing sequence of random doubles 
var sourcingSequence = Observable.Create<double>(observer => 
    { 
        var r = new Random(DateTime.Now.GetHashCode()); 
 
        for (int i = 0; i < 5; i++) 
        { 
            observer.OnNext((r.NextDouble() - 0.5d) * 10d); 
            Thread.Sleep(1000); 
        } 
 
        observer.OnCompleted(); 
 
        return () => 
        { 
            Console.WriteLine("Completed"); 
        }; 
    }); 
 
//aggregate values to compute a single ending value 
var aggregationSequence = sourcingSequence.Aggregate(0d, (rolling, value) => 
{ 
    Console.WriteLine("Aggregating: {0} + {1}", rolling, value); 
    return rolling + value; 
}); 
aggregationSequence.Subscribe(value => Console.WriteLine("Aggregated value: {0}", value)); 
 
Console.ReadLine(); 

In the Aggregate factory method, the first parameter is the starting accumulation function value. Then, we simply need to write the accumulation function in the preceding example, which will show the input value on the console for better understanding of the operation.

MaxBy/MinBy

When dealing with aggregations, there are premade sequences. The MaxBy and MinBy factory methods create sequences that group by the sourcing value for the given key and then return only messages where the key is equal to the min or max value found in the sourcing sequence.

Each of these sequences will produce a single message that contains multiple sourcing messages. The sequence will produce no messages until the sourcing sequence fires its completed message. Here's a group example:

//a sourcing sequence of 2 messages per second 
var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(0.5)) 
    //a transformation into DateTime 
    //skipping milliseconds/nanoseconds 
    .Select(id => new DateTime(DateTime.UtcNow.Year, DateTime.UtcNow.Month, DateTime.UtcNow.Day, DateTime.UtcNow.Hour, DateTime.UtcNow.Minute, DateTime.UtcNow.Second)) 
    //we take messages only for 5 seconds 
    .TakeUntil(DateTimeOffset.Now.AddSeconds(5)); 
 
//the maxby sequence 
var maxBySequence = sourcingSequence.MaxBy(d => d.Ticks); 
maxBySequence.Subscribe(ordered => 
{ 
    foreach (var value in ordered) 
        Console.WriteLine("MaxBy: {0}", value); 
}); 
 
//the minby sequence 
var minBySequence = sourcingSequence.MinBy(d => d.Ticks); 
minBySequence.Subscribe(ordered => 
{ 
    foreach (var value in ordered) 
        Console.WriteLine("MinBy: {0}", value); 
}); 
 
Console.ReadLine(); 

Tip

Usually, primitive types already implement the IComparable interface. To make our types comparable, we may implement the same interface. To compare types that don't implement the IComparable interface by themselves, we can create an external comparer by implementing the IComparer<T> interface. Once we have the comparer, another overload of the MaxBy/MinBy operator will accept the value selector and the comparer as the second parameter to let us use the operator with our custom comparer.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.7.208