Manipulation and filtering of sequence messages helps in the development of complex messaging designs. The most immediate and widely used operator is Where
, which creates a routing sequence of messages already filtered based on its filtering predicate. We have already seen some of the manipulating or filtering factory methods in the previous chapter, such as the Take
, Skip
, Distinct
, and DistinctUntilChanged
methods.
The Where
factory method creates a new sequence that flows messages from another sequence only when a specific Where
predicate succeeds. Here's an example:
//fixed-time interval sequence var fixedTimeBasedSequence = Observable.Interval(TimeSpan.FromSeconds(1)); //convert the message //into time value var dateTimeSequence = fixedTimeBasedSequence .Select(v=> DateTime.UtcNow ); //filtered sequence of times with even second value var filteredSequence = dateTimeSequence.Where(dt => dt.Second % 2 == 0); //outputs the value filteredSequence.Subscribe(dt => { Console.WriteLine("{0:d} {0:T}", dt); }); Console.ReadLine();
The Observable.Join
factory method (available as the Extension
method) creates a single sequence of messages from multiple sequences. To synchronize messages from the sourcing sequences, we must give such messages time to synchronize to others. This time the window makes it evident as to how much reactive programming is time-based. This is different from state-driven paradigms in which an eventual Join
clause would use a specific value to synchronize to another value, as what happens within the SQL language.
Before I try to explain further, let's take a look at an example:
//two sourcing sequences of time-based values var sourceSequence1 = Observable.Interval(TimeSpan.FromSeconds(2)).Select(nr => DateTime.UtcNow); var sourceSequence2 = Observable.Interval(TimeSpan.FromSeconds(3)).Select(nr => DateTime.UtcNow); //a joined sequence of messages var joinedSequence = sourceSequence1.Join(sourceSequence2, v => Observable.Return(v).Delay(TimeSpan.FromMilliseconds(100)), v => Observable.Return(v).Delay(TimeSpan.FromMilliseconds(100)), (v1, v2) => new { fromSequence1 = v1, fromSequence2 = v2 }); joinedSequence.Subscribe(x => { Console.WriteLine("{0} / {1}", x.fromSequence1, x.fromSequence2); });
The preceding example shows the usage of the Join
clause. There are two sourcing sequences of values, one emitting messages 2
seconds each and another emitting messages 3
seconds each. To make things easier, the two sequences produce the DateTime
values.
The Join
method needs the sourcing sequences, two time window function generators and a function that creates the new object from the two sourcing ones.
The most interesting things here are the two time window generator functions. These functions create other sequences of messages with little delay in the message flow. This delay is in the time window that the joined sequence uses to match messages from the two sourcing sequences.
In the preceding example, we take values from two sourcing sequences. To match values based on the time (messages must arise from the sourcing sequences in the same moment), we need to create something like a time buffer to let messages from the two sequences match the other. To make this happen, we will create (with two identical lambda functions) two new sequences with a time delay from the value of the sourcing sequence. These sequences produce a message that is like a time to leave for the sourcing message itself. In other words, it is a matching timeout of the sourcing messages. When the two sourcing sequences flow messages that overlap the timeout of the message from the other source, the two messages match the Join operator filter and flow out with the resulting sequence.
The If
factory method of the Observable
helper module returns a sequence that chooses which sourcing sequence to return to each subscriber based on a conditional function. This is similar to the usual If
statement of C# with a difference that such reactive versions will execute the conditional function to evaluate which sourcing sequence to return each time a new subscriber asks for registering. Here's a complete example:
//two sourcing sequences of time values var sourcingSequence1 = Observable.Interval(TimeSpan.FromSeconds(2)) .Select(id => DateTime.UtcNow); var sourcingSequence2 = Observable.Interval(TimeSpan.FromSeconds(3)) .Select(id => DateTime.UtcNow); //a selection function to choose //which sourcing sequence to use var mustUseTheFirstSequenceSelector = new Func<bool>(() => { var isFirst = DateTime.UtcNow.Second % 2 == 0; Console.WriteLine("IsFirst: {0}", isFirst); return isFirst; }); //a conditional sequence of values from // the first or the second sourcing sequence var conditionalSequence = Observable.If(mustUseTheFirstSequenceSelector, sourcingSequence1, sourcingSequence2); for (int i = 0; i < 3; i++) { Console.WriteLine("Subscribing new observer..."); conditionalSequence.Subscribe(x => { Console.WriteLine("{0}", x); }); Thread.Sleep(1000); } Console.ReadLine();
These factory methods are available as the Extension
methods to any observable sequence and manipulate the message's availability as follows:
TakeUntil
operator sources values until a timeout occursTakeWhile
operator sources values while a condition remains trueSkipUntil
operator will avoid flowing messages until a timeout occursSkipWhile
operator will avoid flowing messages while a condition remains true
Here's a group example:
//a sourcing sequence var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(id => DateTime.UtcNow); //will flow messages for next 5 seconds var takeUntil = sourcingSequence.TakeUntil(DateTimeOffset.Now.AddSeconds(5)); takeUntil.Subscribe(value => { Console.WriteLine("Until5Seconds: {0}", value); }); var begin=DateTime.UtcNow; //will flow messages while in the //same minute of the begin var takeWhile = sourcingSequence.TakeWhile(x => begin.Minute == x.Minute); takeWhile.Subscribe(value => { Console.WriteLine("WhileSameMinute: {0}", value); }); //skip messages for 5 seconds var skipUntil = sourcingSequence.SkipUntil(DateTimeOffset.Now.AddSeconds(5)); skipUntil.Subscribe(value => { Console.WriteLine("SkipFor5Seconds: {0}", value); }); //skip messages of the same minute var skipWhile = sourcingSequence.SkipWhile(x => begin.Minute == x.Minute); skipWhile.Subscribe(value => { Console.WriteLine("SkipSameMinute: {0}", value); }); Console.ReadLine();
These two factory methods give us the ability to create sequences that deal with the last messages of a sourcing sequence. The difference between these two implementations is that, with the TakeLast
, we will obtain a new sequence that will produce only a small numeric amount of messages (given a specific count or time window) just before the sourcing sequence completes its life. On the other hand, SkipLast
will flow all the messages, except the last amount of messages (given a specific count or time window).
Here's a group example:
Console.WriteLine("Starting: {0}", DateTime.Now); //a sourcing sequence for a time-window of 5 seconds var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(id => DateTime.UtcNow) .TakeUntil(DateTimeOffset.Now.AddSeconds(5)); //skip last messages within a time-window of 3 seconds var skipLast = sourcingSequence.SkipLast(TimeSpan.FromSeconds(3)); skipLast.Subscribe(value => { Console.WriteLine("SkipLast: {0}", value); }); //take last messages within a time-window of 3 seconds var takeLast = sourcingSequence.TakeLast(TimeSpan.FromSeconds(3)); takeLast.Subscribe(value => { Console.WriteLine("TakeLast: {0}", value); }); Console.ReadLine();
3.136.22.179