Sequence manipulation and filtering

Manipulation and filtering of sequence messages helps in the development of complex messaging designs. The most immediate and widely used operator is Where, which creates a routing sequence of messages already filtered based on its filtering predicate. We have already seen some of the manipulating or filtering factory methods in the previous chapter, such as the Take, Skip, Distinct, and DistinctUntilChanged methods.

Where

The Where factory method creates a new sequence that flows messages from another sequence only when a specific Where predicate succeeds. Here's an example:

//fixed-time interval sequence
var fixedTimeBasedSequence = Observable.Interval(TimeSpan.FromSeconds(1)); 
 
//convert the message  
//into time value 
var dateTimeSequence = fixedTimeBasedSequence 
    .Select(v=> DateTime.UtcNow ); 
 
//filtered sequence of times with even second value 
var filteredSequence = dateTimeSequence.Where(dt => dt.Second % 2 == 0); 
 
//outputs the value 
filteredSequence.Subscribe(dt => 
    { 
        Console.WriteLine("{0:d} {0:T}", dt); 
    }); 
 
Console.ReadLine(); 

Join

The Observable.Join factory method (available as the Extension method) creates a single sequence of messages from multiple sequences. To synchronize messages from the sourcing sequences, we must give such messages time to synchronize to others. This time the window makes it evident as to how much reactive programming is time-based. This is different from state-driven paradigms in which an eventual Join clause would use a specific value to synchronize to another value, as what happens within the SQL language.

Before I try to explain further, let's take a look at an example:

//two sourcing sequences of time-based values
var sourceSequence1 = Observable.Interval(TimeSpan.FromSeconds(2)).Select(nr => DateTime.UtcNow); 
var sourceSequence2 = Observable.Interval(TimeSpan.FromSeconds(3)).Select(nr => DateTime.UtcNow); 
 
//a joined sequence of messages 
var joinedSequence = sourceSequence1.Join(sourceSequence2, 
    v => Observable.Return(v).Delay(TimeSpan.FromMilliseconds(100)), 
    v => Observable.Return(v).Delay(TimeSpan.FromMilliseconds(100)), 
    (v1, v2) => new { fromSequence1 = v1, fromSequence2 = v2 }); 
 
joinedSequence.Subscribe(x => 
    { 
        Console.WriteLine("{0} / {1}", x.fromSequence1, x.fromSequence2); 
    }); 

The preceding example shows the usage of the Join clause. There are two sourcing sequences of values, one emitting messages 2 seconds each and another emitting messages 3 seconds each. To make things easier, the two sequences produce the DateTime values.

The Join method needs the sourcing sequences, two time window function generators and a function that creates the new object from the two sourcing ones.

The most interesting things here are the two time window generator functions. These functions create other sequences of messages with little delay in the message flow. This delay is in the time window that the joined sequence uses to match messages from the two sourcing sequences.

In the preceding example, we take values from two sourcing sequences. To match values based on the time (messages must arise from the sourcing sequences in the same moment), we need to create something like a time buffer to let messages from the two sequences match the other. To make this happen, we will create (with two identical lambda functions) two new sequences with a time delay from the value of the sourcing sequence. These sequences produce a message that is like a time to leave for the sourcing message itself. In other words, it is a matching timeout of the sourcing messages. When the two sourcing sequences flow messages that overlap the timeout of the message from the other source, the two messages match the Join operator filter and flow out with the resulting sequence.

If

The If factory method of the Observable helper module returns a sequence that chooses which sourcing sequence to return to each subscriber based on a conditional function. This is similar to the usual If statement of C# with a difference that such reactive versions will execute the conditional function to evaluate which sourcing sequence to return each time a new subscriber asks for registering. Here's a complete example:

//two sourcing sequences of time values 
 
var sourcingSequence1 = Observable.Interval(TimeSpan.FromSeconds(2)) 
    .Select(id => DateTime.UtcNow); 
 
var sourcingSequence2 = Observable.Interval(TimeSpan.FromSeconds(3)) 
    .Select(id => DateTime.UtcNow); 
 
 
//a selection function to choose 
//which sourcing sequence to use 
var mustUseTheFirstSequenceSelector = new Func<bool>(() => 
    { 
        var isFirst = DateTime.UtcNow.Second % 2 == 0; 
        Console.WriteLine("IsFirst: {0}", isFirst); 
        return isFirst; 
    }); 
 
//a conditional sequence of values from   
// the first or the second sourcing sequence 
var conditionalSequence = Observable.If(mustUseTheFirstSequenceSelector, sourcingSequence1, sourcingSequence2); 
 
for (int i = 0; i < 3; i++) 
{ 
    Console.WriteLine("Subscribing new observer..."); 
    conditionalSequence.Subscribe(x => 
        { 
            Console.WriteLine("{0}", x); 
        }); 
    Thread.Sleep(1000); 
} 
 
Console.ReadLine(); 

TakeUntil/TakeWhile/SkipUntil/SkipWhile

These factory methods are available as the Extension methods to any observable sequence and manipulate the message's availability as follows:

  • The TakeUntil operator sources values until a timeout occurs
  • The TakeWhile operator sources values while a condition remains true
  • The SkipUntil operator will avoid flowing messages until a timeout occurs
  • The SkipWhile operator will avoid flowing messages while a condition remains true

Here's a group example:

//a sourcing sequence 
var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(id => DateTime.UtcNow); 
 
//will flow messages for next 5 seconds 
var takeUntil = sourcingSequence.TakeUntil(DateTimeOffset.Now.AddSeconds(5)); 
takeUntil.Subscribe(value => 
    { 
        Console.WriteLine("Until5Seconds: {0}", value); 
    }); 
 
var begin=DateTime.UtcNow; 
//will flow messages while in the 
//same minute of the begin 
var takeWhile = sourcingSequence.TakeWhile(x => begin.Minute == x.Minute); 
takeWhile.Subscribe(value => 
    { 
        Console.WriteLine("WhileSameMinute: {0}", value); 
    }); 
 
//skip messages for 5 seconds 
var skipUntil = sourcingSequence.SkipUntil(DateTimeOffset.Now.AddSeconds(5)); 
skipUntil.Subscribe(value => 
    { 
        Console.WriteLine("SkipFor5Seconds: {0}", value); 
    }); 
 
//skip messages of the same minute 
var skipWhile = sourcingSequence.SkipWhile(x => begin.Minute == x.Minute); 
skipWhile.Subscribe(value => 
    { 
        Console.WriteLine("SkipSameMinute: {0}", value); 
    }); 
             
Console.ReadLine(); 

TakeLast/SkipLast

These two factory methods give us the ability to create sequences that deal with the last messages of a sourcing sequence. The difference between these two implementations is that, with the TakeLast, we will obtain a new sequence that will produce only a small numeric amount of messages (given a specific count or time window) just before the sourcing sequence completes its life. On the other hand, SkipLast will flow all the messages, except the last amount of messages (given a specific count or time window).

Here's a group example:

Console.WriteLine("Starting: {0}", DateTime.Now); 
//a sourcing sequence for a time-window of 5 seconds 
var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(id => DateTime.UtcNow) 
    .TakeUntil(DateTimeOffset.Now.AddSeconds(5)); 
 
//skip last messages within a time-window of 3 seconds 
var skipLast = sourcingSequence.SkipLast(TimeSpan.FromSeconds(3)); 
skipLast.Subscribe(value => 
    { 
        Console.WriteLine("SkipLast: {0}", value); 
    }); 
 
//take last messages within a time-window of 3 seconds 
var takeLast = sourcingSequence.TakeLast(TimeSpan.FromSeconds(3)); 
takeLast.Subscribe(value => 
{ 
    Console.WriteLine("TakeLast: {0}", value); 
}); 
 
Console.ReadLine(); 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.136.22.179