After tracing what flows within a sequence, the second most useful diagnostic feature is to verify the sequence content against a predicted content or a static content (usually for testing purposes with mocking objects) simply because we need to check the message homogeneity. Luckily, there are different operators that help us achieve this.
This is maybe the easiest case to deal with when we need to check if/when a sequence contains a given value.
In reactive, we always deal with sequences; this is true also when we want to aspect a Boolean value, as in this case, if we were programming in a nonreactive way. The Contains
extension method produces a new sequence that will fire a single message with a value informing us if we found what we're searching for immediately after the sourcing sequence is complete. This is easily visible by instrumenting Rx materializing our Contains
sequence. Here's an example:
var r = new Random(DateTime.Now.GetHashCode()); //an infinite message source of integer numbers //running at 10hz var source = Observable.Interval(TimeSpan.FromMilliseconds(100)) .Select(x => r.Next(1, 20)); var contains = source.Contains(10) //we want message metadata .Materialize(); //some console output source.Subscribe(x => Console.WriteLine(x)); contains.Subscribe(x => Console.WriteLine("FOUND: {0}", x)); Console.ReadLine();
Almost identical to the Contains
extensions method, Any
produces a sequence that flows messages about its search results. The difference is the overload; it needs a Func<T,bool>
. Instead, the Contains
method needs a raw value.
Here is a short example:
var any = source.Any(x => x == 10) //we want message metadata .Materialize(); any.Subscribe(x => Console.WriteLine("FOUND ANY: {0}", x));
Similar to the Any
method, the All
extension method produces a sequence flowing the result of our search. The difference is that the All
method fires only when all the sourcing messages succeed in passing the predicate expression, waiting for the completion of the sourcing sequence before sourcing its result message. Instead, if any message fails the validating expression, the All
sequence immediately flows out the failure result.
Here's an example:
var r = new Random(DateTime.Now.GetHashCode()); var stopperSequence = new Subject<bool>(); //an infinite message source of integer numbers //running at 10hz var source = Observable.Interval(TimeSpan.FromMilliseconds(100)) .Select(x => r.Next(1, 20)) //take only until we press RETURN .TakeUntil(stopperSequence); source.Subscribe(x => Console.WriteLine(x)); var all = source.All(x => x < 18) //we want message metadata .Materialize(); all.Subscribe(x => Console.WriteLine("FOUND ALL: {0}", x)); //wait until user press RETURN Console.ReadLine(); //notify the stop message stopperSequence.OnNext(true); //wait again to see the result Console.ReadLine();
The preceding example is similar to the Any
method and the Contains
method. But this one needs a more detailed explanation.
First of all, we're using another sequence (stopperSequence
) to signal the other sequence (source
) when to stop flowing messages.
This is accomplished by using the TakeUntil
method on the source
sequence that returns a new sequence to flow messages until a message flows from the parameter sequence (stopperSequence
).
Take a look at the result; it takes only a few seconds. When we press the Enter key, the All
sequence intercepts the OnCompleted
message and executes its logic returning result as to whether all the messages are complying with the lambda expression.
In other words, to ensure the True
result, the All
sequence waits for the sourcing sequence's OnCompleted
message, while any failing message immediately produces a False
result.
The SequenceEqual
extension method produces a sequence almost identical to the one from the All
method. The difference is that the All
method checks whether all the messages of a single sourcing sequence comply with a specified predicate, while SequenceEqual
produces a sequence that returns if two sourcing sequences are identical in their content.
Similar to the All
sequence, SequenceEqual
immediately outputs a failing result while waiting for the completion of all sourcing sequences leads to a succeeding result.
Here's an example:
//two random generators //without the random initial seed var r1 = new Random(); var r2 = new Random(); //two infinite message source of integer numbers running at 1hz var source1 = Observable.Interval(TimeSpan.FromMilliseconds(1000)) .Select(x => r1.Next(1, 20)); var source2 = Observable.Interval(TimeSpan.FromMilliseconds(1000)) .Select(x => r2.Next(1, 20)); var identical = source1.SequenceEqual(source2) .Materialize(); source1.Subscribe(x => Console.WriteLine("1: {0}", x)); source2.Subscribe(x => Console.WriteLine("2: {0}", x)); identical.Subscribe(x => Console.WriteLine("Equals: {0}", x)); Console.ReadLine();
18.221.123.73