The following constructs can be converted to a sequence source. IObservable<T>
can be generated from the following:
IEnumerable<T>
We have now understood how one can convert an IEnumerable<T>
-based pull program to an IObservable<T>
/IObserver<T>
-based push program. In real life, the event source is not as simple as we found in the number stream example given previously. Let us see how we can convert a MouseMove
event into a stream with a small WinForms program:
static void Main() { var mylabel = new Label(); var myform = new Form { Controls = { mylabel } }; IObservable<EventPattern<MouseEventArgs>> mousemove = Observable. FromEventPattern<MouseEventArgs>(myform, "MouseMove"); mousemove.Subscribe( (evt)=>{mylabel.Text = evt.EventArgs.X.ToString();}, ()=>{}); Application.Run(myform); }
Please see the following form, which displays the mouse positions:
The whole idea of converting data to streams is to apply functional programming operators such as Reduce, Aggregate, Fold, and so on. This is quite relevant in terms of choosing the needed data (and also in an efficient way) from an enormous pile that is ever growing with respect to time:
Some of the common sequence/stream operators supported by Rx/LINQ are as follows:
Where
: As the name implies, and for those familiar with this operator from LINQ days, it does the very purpose of filtering of sequences. If we were to rewrite our earlier example-that of extracting/filtering even numbers-it would declaratively be as simple as this:
var evenNumbers = Observable.Range(0, 10) .Where(i => i % 2 == 0) .Subscribe(Console.WriteLine);
Skip
: This helps in skipping n items in a sequence.Take
: This helps in taking n items (skipping the rest) in a sequence.SkipWhile
: This helps in skipping items (while a certain condition is satisfied) in a sequence. Please note that the element would be skipped until the predicate evaluates to true
. Beyond this, all items would be returned.TakeWhile
: This is the converse of SkipWhile
, and helps in taking items (while a certain condition is satisfied) in a sequence.SkipUntil
: This requires two observable sequences, and continues to skip all the values in the first sequence until any value is produced by the second sequence.TakeUntil
: Again, this requires two observable sequences, and forces the first sequence to completion when the second sequence starts producing any value.SkipLast
: Intelligently queues elements, skips the last n elements, and returns the rest.TakeLast
: Returns the last n elements.Zip
: Merges two observable sequences into one observable sequence.Rx provides a set of operators, which can help us to inspect the contents of a stream. Some of them are:
Any
: This returns an observable sequence (result), which returns one value (True
or False
) and completes. True
indicates that the source sequence produced a value that caused the result sequence to produce True
. On the other hand, the result sequence returns False
if the source sequence completes without any values.All
: This works similar to Any
except that the results sequence returns True
if the predicate is evaluated to True
and False
vice versa.Contains
: This shows the same behavior as All
except that it helps seek a specific value instead of a value that fits the predicate.ElementAt
: This returns an observable sequence (result), which returns the value in the source sequence (specified by the index) and completes. It uses a 0-based index.Rx provides a series of operators, which help us to aggregate the content of a stream. Some of the most important ones are as follows:
Count
Min
, Max
, Average
, Sum
(descriptive statistics)MinBy
, MaxBy
, GroupBy
(partitioning)The values produced by our event source are not in the formats that we might want, and we are required to make the transformation on each element in the sequence. The most important functional transformation is bind, where we apply a function (morphism) on each element of the sequence to produce a new sequence.
In functional programming parlance, the transformations available are the following:
T
to IObservable<T>
:
Unfold
Generate
IObservable<T>
to T
:
Fold
Reduce
Accumulate
IObservable<T>
to IObservable<T>
:
Map
SelectMany
Projection
Transform
We get data from different data sources, and it is necessary to combine streams to do processing. Rx provides a series of operators, which can be grouped into the following:
Concat
: As the name indicates, the resulting sequence concatenates multiple input sequences without interleaving themRepeat
: Creates a sequence that emits a particular item multiple timesStartWith
: Emits a specified sequence of items before beginning to emit the items from the source sequenceAmb
: This returns one of the sequences (from two or more source sequences), which first starts emitting an item or notificationMerge
: As the name indicates, this operator combines multiple sequences into oneSwitch
: This returns a sequence (from two or more source sequences) that emits the items emitted by the most recently emitted one of those input/source sequencesCombineLatest
: Combines the most recently emitted items from each of the participating input sequences using a function that is provided, and emits the return value of that functionZip
: The behavior is similar to that of CombineLatest
except that the function is applied in a strict sequence (in terms of combining items from the participating input sequences)And
/Then
/When
: The behavior is very similar to Zip
, but certain intermediary data structures (namely pattern and plan objects) are used as part of combining the input sequences before emitting the resulting sequence18.223.172.252