Partitioning is the ability to split data into multiple strips.
When dealing with reactive sequences, partitioning means splitting a sourcing sequence into multiple subsequences. The goal may be to maintain message flow consistency when dealing with multiple parallel sequences. In this case, although we may prefer working with data from all the sourcing sequences in a single sequence chain for simplicity and maintainability, at a time, we will need to split the messages from their original flow by grouping them by one (or more) properties.
In reactive programming, we never partition for performance needs (such as parallelizing processing) because this is achieved by default using the reactive framework itself if we use the proper overall design (refer to Chapter 6, CLR Integration and Scheduling, for a more in-depth the argument).
The king of the partitioning functions is the GroupBy
one. This is the same within Rx, thus we have the GroupBy
extension method that will produce a grouping observable sequence (IGroupedObservable<T>
) of small sequences to deal with a single group item per time.
These sequences will flow out the grouping sequence in a lazy way. Once a new message, requiring a new subsequence, flows out the sourcing sequence, the grouping sequence will flow out a new grouping sequence. This is why we need a two-level subscribing method, because at the first level, we register the new subsequence, and at the second one we register the message observer as usual for nongrouped sequences.
Here's an example:
//a sourcing sequence var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(id => DateTime.UtcNow); //sequence partitioning by seconds var partitions = sourcingSequence.GroupBy(x => Math.Floor(x.Second / 10d)); //register the partition per group key partitions.Subscribe(partition => { Console.WriteLine("Registering observer for: {0}", partition.Key); //register the observer per partition partition.Subscribe(value => { Console.WriteLine("partition {0}: {1}", partition.Key, value); }); }); Console.ReadLine();
Nested subscriptions, although useful for demonstration purposes, are something we should avoid in the real world because they may cause difficulties in maintainability and debugging, and (most importantly) because they reduce our control on the overall flow because of the nested delegates we need to write to handle nested sequences. Although we will see other nested sequences here, the suggestion when dealing with real-world coding is to write something like the following example of the GroupBy
operator. The only difference is that, to avoid nesting, we need to flatten the multiple stripes into a single sequence of new messages containing the original value and the group key. Although we're still logically grouping, we don't actually need multiple real subsequences:
//register the partition per group key
//without nested sequences partitions
//transform inner groups into new objects
//containing the key and the value altogether
.SelectMany(group => group.Select(x => new { key = group.Key, value = x }))
.Subscribe(msg => Console.WriteLine("partition {0}: {1}", msg.key, msg.value));
The Aggregate
factory method creates a new sequence that will interact with each source message, returning a single output message that will produce any operation we want. In the following example, there is an accumulator function that simply adds each message value to the next:
//a sourcing sequence of random doubles var sourcingSequence = Observable.Create<double>(observer => { var r = new Random(DateTime.Now.GetHashCode()); for (int i = 0; i < 5; i++) { observer.OnNext((r.NextDouble() - 0.5d) * 10d); Thread.Sleep(1000); } observer.OnCompleted(); return () => { Console.WriteLine("Completed"); }; }); //aggregate values to compute a single ending value var aggregationSequence = sourcingSequence.Aggregate(0d, (rolling, value) => { Console.WriteLine("Aggregating: {0} + {1}", rolling, value); return rolling + value; }); aggregationSequence.Subscribe(value => Console.WriteLine("Aggregated value: {0}", value)); Console.ReadLine();
In the Aggregate
factory method, the first parameter is the starting accumulation function value. Then, we simply need to write the accumulation function in the preceding example, which will show the input value on the console for better understanding of the operation.
When dealing with aggregations, there are premade sequences. The MaxBy
and MinBy
factory methods create sequences that group by the sourcing value for the given key and then return only messages where the key is equal to the min or max value found in the sourcing sequence.
Each of these sequences will produce a single message that contains multiple sourcing messages. The sequence will produce no messages until the sourcing sequence fires its completed message. Here's a group example:
//a sourcing sequence of 2 messages per second var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(0.5)) //a transformation into DateTime //skipping milliseconds/nanoseconds .Select(id => new DateTime(DateTime.UtcNow.Year, DateTime.UtcNow.Month, DateTime.UtcNow.Day, DateTime.UtcNow.Hour, DateTime.UtcNow.Minute, DateTime.UtcNow.Second)) //we take messages only for 5 seconds .TakeUntil(DateTimeOffset.Now.AddSeconds(5)); //the maxby sequence var maxBySequence = sourcingSequence.MaxBy(d => d.Ticks); maxBySequence.Subscribe(ordered => { foreach (var value in ordered) Console.WriteLine("MaxBy: {0}", value); }); //the minby sequence var minBySequence = sourcingSequence.MinBy(d => d.Ticks); minBySequence.Subscribe(ordered => { foreach (var value in ordered) Console.WriteLine("MinBy: {0}", value); }); Console.ReadLine();
Usually, primitive types already implement the IComparable
interface. To make our types comparable, we may implement the same interface. To compare types that don't implement the IComparable
interface by themselves, we can create an external comparer by implementing the IComparer<T>
interface. Once we have the comparer, another overload of the MaxBy
/MinBy
operator will accept the value selector and the comparer as the second parameter to let us use the operator with our custom comparer.
3.145.7.208