Combining operators

Combining operators combine multiple sequences into a new sequence, eventually with a specific design to reduce message flow.

Combine latest

The CombineLatest operator produces a new sequence that combines multiple sourcing sequences by joining such messages to produce a new composite message. Kindly consider that anytime each of the source enumerable flows a new message, regardless of being the first or the second sequence, a new composite message will flow throughout the combined latest sequence.

The new sequence will start flowing messages when all the sourcing sequences produce their first message.

Combine latest

A marble diagram showing a combine latest operation

Here's an example:

var s6 = new Subject<string>(); 
var s7 = new Subject<int>(); 
var clatest = s6.CombineLatest(s7, (x, y) => new { text = x, value = y, }); 
clatest.Subscribe(x => Console.WriteLine("{0}: {1}", x.text, x.value)); 
 
//some message 
s6.OnNext("Mr. Brown"); 
s7.OnNext(10); 
s7.OnNext(20); 
s6.OnNext("Mr. Green"); 
s6.OnNext("Mr. White"); 
s7.OnNext(30); 

Concat

The Concat operator creates a new sequence that contains the concatenation of multiple sourcing sequences, as they are registered as sourcing sequence.

In other words, as shown in the following screenshot, the Concat sequence will start flowing messages from the first sourcing sequence and will then start flowing messages from the second sourcing sequence and so on. An important aspect is that this operator will start flowing messages from a new sequence only after the previous sequence completes (by flowing the OnComplete message) correctly. In other words, this operator sequentially flows messages from multiple sequences. For the parallel version, take a look at the Merge operator in the following section.

Concat

A marble diagram showing a Concat operation

Here's an example:

var s8 = new Subject<string>(); 
var s9 = new Subject<string>(); 
var concat = s8.Concat(s9); 
concat.Subscribe(Console.WriteLine); 
 
//some message 
s8.OnNext("value1"); 
s8.OnNext("value2"); 
s9.OnNext("value3"); //missed 
s9.OnNext("value4"); //missed 
s8.OnNext("value5"); 
//close first sequence 
s8.OnCompleted(); 
//only now messages from second sequence will start flowing 
s9.OnNext("value6"); 

Merge

The Merge operation flattens messages of the same type between multiple sequences into a single output sequence. In other words, it combines values from a multiple sequence in a parallel way.

Merge

A marble diagram showing a merge operation

Here's an example:

var s1 = new Subject<string>(); 
var s2 = new Subject<string>(); 
var merge = s1.Merge(s2); 
merge.Subscribe(Console.WriteLine); 
 
s1.OnNext("value1"); //first subject 
s2.OnNext("value2"); //second subject 

Sample

A Sample sequence lets the values flow from another sequence only when a value flows from another sequence that works like a metronome when playing music. The second sequence simply defines the time when a message can flow. The first sequence, however is the source of all the messages that will flow within the sampling. This is something like a polling-based design.

Consider the case when we read data from a PLC analogic port. The PLC usually works in a reactive way by itself (PLC's SDK may support multiple data paradigm outputs); this means that it flows out new analogic port values only when these change the value changes. Rather, if we need sampling data at a fixed time, as happens in digital audio, we may use the Sample operator by reading raw data from the PLC sourcing sequence, and a clock message from another sequence. With this design, we will be able to sample at a fixed time from a source that flows values as they're available with few efforts in a full reactive design.

In Rx, the Sample extension method has two overloads. One overload accepts a  DateTime parameter useful to sample at a fixed time. Another overload accepts another sequence to sample when sampling messages flow. Although an overload accepting a fixed time parameter is available with the Scan operator, the canonical one is the overload with two sequence parameter overloads.

Sample

A marble diagram showing a sample operation

Here's an example:

var samplingValueSequence = new Subject<int>(); 
var samplingTimeSequence = new Subject<object>(); 
var samplingSequence = samplingValueSequence.Sample(samplingTimeSequence); 
//register an observer 
samplingSequence.Subscribe(new Action<int>(x => Console.WriteLine(x))); 
 
//some value 
samplingValueSequence.OnNext(10); //ignored 
samplingValueSequence.OnNext(20); 
//raise a message into the sampling time sequence 
samplingTimeSequence.OnNext(null); //last value will be outputted now 
samplingValueSequence.OnNext(30); //ignored 
samplingValueSequence.OnNext(40); 
//raise a message into the sampling time sequence 
samplingTimeSequence.OnNext(null); //last value will be outputted now 

StartWith

The StartWith operator is similar to the Concat operator because they concatenate values of sequences with the difference that StartWith inserts a specific value at the beginning of its sourcing sequence. The other difference is that StartWith does not work on multiple sequences. It simply uses a defined group of values, such as an array or any IEnumerable.

Startwith is very useful when combined with the Scan operator because it gives the Scan operator a value to start the running total (or any other running operation).

StartWith

A marble diagram showing a startwith operation

Here's an example:

var s10 = new Subject<string>(); 
var swith = s10.StartWith("value0"); 
swith.Subscribe(Console.WriteLine); 
s10.OnNext("value1"); 
s10.OnNext("value2"); 

Zip

The Zip operator is similar to the combine latest operator, producing values from two other sequences only when each source sequence has a new value. The difference is that the combine latest operator produces messages wherever any source sequence produces a new message, eventually reusing the same value from the other sequence. Instead, the Zip operator synchronizes the two sequences using the message index number as a correlation ID to flow messages always together, eventually waiting for the two sequences to have the new couple of messages to produce the new one.

As the combine latest operator, the Zip operator translates the two source messages into a new message by executing a transformation operation.

Zip

A marble diagram showing a zip operation

Here's an example:

var s11 = new Subject<string>(); 
var s12 = new Subject<double>(); 
var zip = s11.Zip(s12, (x, y) => new { text = x, value = y }); 
zip.Subscribe(x => Console.WriteLine("{0}: {1}", x.text, x.value)); 
//same example of combine latest 
s11.OnNext("Mr. Brown"); 
s12.OnNext(10); 
s12.OnNext(20); 
s11.OnNext("Mr. Green"); 
s11.OnNext("Mr. White"); 
s12.OnNext(30); 
//this time the output is synchronized 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.254.61