Combining operators combine multiple sequences into a new sequence, eventually with a specific design to reduce message flow.
The CombineLatest
operator produces a new sequence that combines multiple sourcing sequences by joining such messages to produce a new composite message. Kindly consider that anytime each of the source enumerable flows a new message, regardless of being the first or the second sequence, a new composite message will flow throughout the combined latest sequence.
The new sequence will start flowing messages when all the sourcing sequences produce their first message.
Here's an example:
var s6 = new Subject<string>(); var s7 = new Subject<int>(); var clatest = s6.CombineLatest(s7, (x, y) => new { text = x, value = y, }); clatest.Subscribe(x => Console.WriteLine("{0}: {1}", x.text, x.value)); //some message s6.OnNext("Mr. Brown"); s7.OnNext(10); s7.OnNext(20); s6.OnNext("Mr. Green"); s6.OnNext("Mr. White"); s7.OnNext(30);
The Concat
operator creates a new sequence that contains the concatenation of multiple sourcing sequences, as they are registered as sourcing sequence.
In other words, as shown in the following screenshot, the Concat
sequence will start flowing messages from the first sourcing sequence and will then start flowing messages from the second sourcing sequence and so on. An important aspect is that this operator will start flowing messages from a new sequence only after the previous sequence completes (by flowing the OnComplete
message) correctly. In other words, this operator sequentially flows messages from multiple sequences. For the parallel version, take a look at the Merge
operator in the following section.
Here's an example:
var s8 = new Subject<string>(); var s9 = new Subject<string>(); var concat = s8.Concat(s9); concat.Subscribe(Console.WriteLine); //some message s8.OnNext("value1"); s8.OnNext("value2"); s9.OnNext("value3"); //missed s9.OnNext("value4"); //missed s8.OnNext("value5"); //close first sequence s8.OnCompleted(); //only now messages from second sequence will start flowing s9.OnNext("value6");
The Merge
operation flattens messages of the same type between multiple sequences into a single output sequence. In other words, it combines values from a multiple sequence in a parallel way.
Here's an example:
var s1 = new Subject<string>(); var s2 = new Subject<string>(); var merge = s1.Merge(s2); merge.Subscribe(Console.WriteLine); s1.OnNext("value1"); //first subject s2.OnNext("value2"); //second subject
A Sample
sequence lets the values flow from another sequence only when a value flows from another sequence that works like a metronome when playing music. The second sequence simply defines the time when a message can flow. The first sequence, however is the source of all the messages that will flow within the sampling. This is something like a polling-based design.
Consider the case when we read data from a PLC analogic port. The PLC usually works in a reactive way by itself (PLC's SDK may support multiple data paradigm outputs); this means that it flows out new analogic port values only when these change the value changes. Rather, if we need sampling data at a fixed time, as happens in digital audio, we may use the Sample operator by reading raw data from the PLC sourcing sequence, and a clock message from another sequence. With this design, we will be able to sample at a fixed time from a source that flows values as they're available with few efforts in a full reactive design.
In Rx, the Sample
extension method has two overloads. One overload accepts a DateTime
parameter useful to sample at a fixed time. Another overload accepts another sequence to sample when sampling messages flow. Although an overload accepting a fixed time parameter is available with the Scan
operator, the canonical one is the overload with two sequence parameter overloads.
Here's an example:
var samplingValueSequence = new Subject<int>(); var samplingTimeSequence = new Subject<object>(); var samplingSequence = samplingValueSequence.Sample(samplingTimeSequence); //register an observer samplingSequence.Subscribe(new Action<int>(x => Console.WriteLine(x))); //some value samplingValueSequence.OnNext(10); //ignored samplingValueSequence.OnNext(20); //raise a message into the sampling time sequence samplingTimeSequence.OnNext(null); //last value will be outputted now samplingValueSequence.OnNext(30); //ignored samplingValueSequence.OnNext(40); //raise a message into the sampling time sequence samplingTimeSequence.OnNext(null); //last value will be outputted now
The StartWith
operator is similar to the Concat
operator because they concatenate values of sequences with the difference that StartWith
inserts a specific value at the beginning of its sourcing sequence. The other difference is that StartWith
does not work on multiple sequences. It simply uses a defined group of values, such as an array or any IEnumerable
.
Startwith
is very useful when combined with the Scan
operator because it gives the Scan
operator a value to start the running total (or any other running operation).
Here's an example:
var s10 = new Subject<string>(); var swith = s10.StartWith("value0"); swith.Subscribe(Console.WriteLine); s10.OnNext("value1"); s10.OnNext("value2");
The Zip
operator is similar to the combine latest operator, producing values from two other sequences only when each source sequence has a new value. The difference is that the combine latest operator produces messages wherever any source sequence produces a new message, eventually reusing the same value from the other sequence. Instead, the Zip
operator synchronizes the two sequences using the message index number as a correlation ID to flow messages always together, eventually waiting for the two sequences to have the new couple of messages to produce the new one.
As the combine latest operator, the Zip
operator translates the two source messages into a new message by executing a transformation operation.
Here's an example:
var s11 = new Subject<string>(); var s12 = new Subject<double>(); var zip = s11.Zip(s12, (x, y) => new { text = x, value = y }); zip.Subscribe(x => Console.WriteLine("{0}: {1}", x.text, x.value)); //same example of combine latest s11.OnNext("Mr. Brown"); s12.OnNext(10); s12.OnNext(20); s11.OnNext("Mr. Green"); s11.OnNext("Mr. White"); s12.OnNext(30); //this time the output is synchronized
18.221.254.61