Converting entities to streams (IObservable<T>)

The following constructs can be converted to a sequence source. IObservable<T> can be generated from the following:

  • Events
  • Delegates
  • Tasks
  • IEnumerable<T>
  • Asynchronous programming model

Converting events into stream

We have now understood how one can convert an IEnumerable<T>-based pull program to an IObservable<T>/IObserver<T>-based push program. In real life, the event source is not as simple as we found in the number stream example given previously. Let us see how we can convert a MouseMove event into a stream with a small WinForms program:

    static void Main()  
    { 
      var mylabel = new Label(); 
      var myform = new Form { Controls = { mylabel } }; 
 
      IObservable<EventPattern<MouseEventArgs>>  
      mousemove =  
      Observable. 
      FromEventPattern<MouseEventArgs>(myform, "MouseMove"); 
 
      mousemove.Subscribe( 
        (evt)=>{mylabel.Text = evt.EventArgs.X.ToString();}, 
        ()=>{}); 
 
      Application.Run(myform); 
    } 

Please see the following form, which displays the mouse positions:

Converting events into stream

Reduction of streams (sequences)

The whole idea of converting data to streams is to apply functional programming operators such as Reduce, Aggregate, Fold, and so on. This is quite relevant in terms of choosing the needed data (and also in an efficient way) from an enormous pile that is ever growing with respect to time:

  • Filter and partition operators: These operations help to reduce the source sequence into a sequence of elements that we are interested in
  • Aggregation operators: Reduce the source sequence to a sequence with a single element
  • Fold operators: Reduce the source sequence to a single element as a scalar value

Some of the common sequence/stream operators supported by Rx/LINQ are as follows:

  • Where: As the name implies, and for those familiar with this operator from LINQ days, it does the very purpose of filtering of sequences. If we were to rewrite our earlier example-that of extracting/filtering even numbers-it would declaratively be as simple as this:
            var evenNumbers = Observable.Range(0, 10) 
            .Where(i => i % 2 == 0) 
            .Subscribe(Console.WriteLine); 
    
  • In the preceding example, the input is 0, 1, 2, 3, 4, 5, 6, 7, 8, and 9, and the output will be 2, 4, 6, and 8.
  • Skip: This helps in skipping n items in a sequence.
  • Take: This helps in taking n items (skipping the rest) in a sequence.
  • SkipWhile: This helps in skipping items (while a certain condition is satisfied) in a sequence. Please note that the element would be skipped until the predicate evaluates to true. Beyond this, all items would be returned.
  • TakeWhile: This is the converse of SkipWhile, and helps in taking items (while a certain condition is satisfied) in a sequence.
  • SkipUntil: This requires two observable sequences, and continues to skip all the values in the first sequence until any value is produced by the second sequence.
  • TakeUntil: Again, this requires two observable sequences, and forces the first sequence to completion when the second sequence starts producing any value.
  • SkipLast: Intelligently queues elements, skips the last n elements, and returns the rest.
  • TakeLast: Returns the last n elements.
  • Zip: Merges two observable sequences into one observable sequence.

Inspection of streams (sequences)

Rx provides a set of operators, which can help us to inspect the contents of a stream. Some of them are:

  • Any: This returns an observable sequence (result), which returns one value (True or False) and completes. True indicates that the source sequence produced a value that caused the result sequence to produce True. On the other hand, the result sequence returns False if the source sequence completes without any values.
  • All: This works similar to Any except that the results sequence returns True if the predicate is evaluated to True and False vice versa.
  • Contains: This shows the same behavior as All except that it helps seek a specific value instead of a value that fits the predicate.
  • ElementAt: This returns an observable sequence (result), which returns the value in the source sequence (specified by the index) and completes. It uses a 0-based index.

Aggregation of streams (sequences)

Rx provides a series of operators, which help us to aggregate the content of a stream. Some of the most important ones are as follows:

  • Count
  • Min, Max, Average, Sum (descriptive statistics)
  • MinBy, MaxBy, GroupBy (partitioning)
  • Custom aggregators and scans

Transformation of streams (sequences)

The values produced by our event source are not in the formats that we might want, and we are required to make the transformation on each element in the sequence. The most important functional transformation is bind, where we apply a function (morphism) on each element of the sequence to produce a new sequence.

In functional programming parlance, the transformations available are the following:

  • Anamorphism, which transforms T to IObservable<T>:
    • Unfold
    • Generate
  • Catamorphism, which transforms an IObservable<T> to T:
    • Fold
    • Reduce
    • Accumulate
  • Bind, which transforms an IObservable<T> to IObservable<T>:
    • Map
    • SelectMany
    • Projection
    • Transform

Combining streams (sequences)

We get data from different data sources, and it is necessary to combine streams to do processing. Rx provides a series of operators, which can be grouped into the following:

  • Sequential concatenation:
    • Concat: As the name indicates, the resulting sequence concatenates multiple input sequences without interleaving them
    • Repeat: Creates a sequence that emits a particular item multiple times
    • StartWith: Emits a specified sequence of items before beginning to emit the items from the source sequence
  • Concurrent sequences:
    • Amb: This returns one of the sequences (from two or more source sequences), which first starts emitting an item or notification
    • Merge: As the name indicates, this operator combines multiple sequences into one
    • Switch: This returns a sequence (from two or more source sequences) that emits the items emitted by the most recently emitted one of those input/source sequences
  • Pairing sequences:
    • CombineLatest: Combines the most recently emitted items from each of the participating input sequences using a function that is provided, and emits the return value of that function
    • Zip: The behavior is similar to that of CombineLatest except that the function is applied in a strict sequence (in terms of combining items from the participating input sequences)
    • And/Then/When: The behavior is very similar to Zip, but certain intermediary data structures (namely pattern and plan objects) are used as part of combining the input sequences before emitting the resulting sequence
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.172.252