Sometimes, we need functions to create message repetitions or to manipulate or reuse a subscription with multiple observers. This is the time when advanced operators come into play.
The IgnoreElements
factory method creates a new sequence that will ignore any value message. Instead, errors and completion messages will normally flow out from the sourcing sequence.
This method is particularly useful to create multiple acknowledgements or simply to append some completion code to a sourcing sequence. Here's an example:
//the sourcing sequence of errors or completed messages var sourcingSequence = Observable.Throw<object>(new Exception("Test")); //a sequence able to handle only errors or completed messages var ignoredElements = sourcingSequence.IgnoreElements(); ignoredElements.Subscribe(new ConsoleObserver());
The ConsoleObserver
class is the same as that of the previous examples. See the Interval or Create sections in this chapter.
The Repeat
factory method creates a sequence that will flow out sourcing sequence messages multiple (or infinite) times. Here's an example:
//a sourcing sequence of 5 elements var sourcingSequence = Observable.Range(1, 5); //the repeating sequence var repeatFor2Times = sourcingSequence.Repeat(2); repeatFor2Times.Subscribe(value => Console.WriteLine("Value: {0}", value));
Although the Repeat
factory method is pretty simple to use, it is perfect to show the reader a possible unintended behavior. Anytime we subscribe a sequence, most of the premade factory methods restart their implementation. When dealing with simple integers, it is difficult to notice such behavior, but when dealing with the DateTime
values, this is immediate. Here's an example:
//a sourcing sequence of 5 elements var sourcingSequence = Observable.Range(1, 5) //slow down .Select(i => { Thread.Sleep(1000); return i; }) //take the actual time .Select(i => DateTime.Now); //the repeating sequence var repeatFor2Times = sourcingSequence.Repeat(2); repeatFor2Times.Subscribe(value => Console.WriteLine("Value: {0}", value));
In this example, we cannot use the Interval
factory method, because this one lacks the completion message that is required for the Repeat
method.
By using a DateTime
value, you will immediately see that the Repeat
sequence will subscribe two times the sourcing sequence that came from a Range
factory method. This means that the integer values are valued two times in the Select
transformation providing new DateTime
values, instead of repeating the original ones. If this is an unintended behavior, we can use the Publish
factory method just before.
The Publish
method produces a single subscription sequence available to any other following subscriber. As it has already been said before, any subscription may cause the sourcing sequence to produce different values. Here's an example based on a random value:
var r = new Random(DateTime.Now.GetHashCode()); //a randomic value sequence var sourcingSequence = Observable.Range(1, 5) //slow down .Select(i => { Thread.Sleep(500); return i; }) //take the actual time .Select(i => r.Next()); //multiple subscriptions causing different //values being printed onto the console sourcingSequence.Subscribe(value => Console.WriteLine("Observer#1: {0}", value)); sourcingSequence.Subscribe(value => Console.WriteLine("Observer#2: {0}", value)); sourcingSequence.Subscribe(value => Console.WriteLine("Observer#3: {0}", value));
The preceding example will print to the console as follows:
Observer#1: 1387437772 Observer#1: 1673597686 Observer#1: 407780858 Observer#1: 630401573 Observer#1: 336086919 Observer#2: 1071679403 Observer#2: 302043112 Observer#2: 1359704606 Observer#2: 413086291 Observer#2: 1357199039 Observer#3: 1918565397 Observer#3: 1660389991 Observer#3: 1852413164 Observer#3: 1520275706 Observer#3: 1750871851
As we can see, each subscription will start with a new ranged source sequence. Instead, the Publish
method will create a single subscription sequence that will be available to multiple subscribers, routing the messages to all the observers. To start publishing messages, we need to turn on (and eventually off) the flowing by invoking the Connect
method of the publishing sequence. Once we complete our task, we will dispose such connection flags. Here's another example:
//the sourcing sequence var publishedSequence = Observable.Interval(TimeSpan.FromSeconds(0.5)) .Select(i => DateTime.Now) .Publish(); //attach subscribers before connecting the publisher publishedSequence.Subscribe(value => Console.WriteLine("Observer#1: {0}", value)); publishedSequence.Subscribe(value => Console.WriteLine("Observer#2: {0}", value)); publishedSequence.Subscribe(value => Console.WriteLine("Observer#3: {0}", value)); while (true) { Console.WriteLine("Press RETURN to connect the published sequence"); Console.ReadLine(); using (var connected = publishedSequence.Connect()) { Console.WriteLine("Press RETURN to quit the connection"); Console.ReadLine(); } //now we disconnected from the published sequence }
Now, we have the same value flowing to all the subscribers, thus such subscribers receive their value, value by value, instead of having each subscriber wait for the previous subscriber to read the whole sequence as it happened in the previous example.
Similar to the Publish
/Connect
operator pattern, RefCount
returns a sequence of the published messages to its subscribers by subscribing only once to the sourcing sequence.
The difference is that RefCount
automatically connects and disconnects from the published sequence while there is at least one subscriber. Once the last subscriber unsubscribes, the RefCount
operator will automatically disconnect from the sourcing sequence. This is a powerful operator because it avoids the implementation of the Publish
/Connect
pattern by ourselves, and because this implementation avoids the occurrence of a race condition between the Publish
and the Subscribe
operators that in some cases may happen.
Here's an example:
//the sourcing sequence var publishedSequence = Observable.Interval(TimeSpan.FromSeconds(0.5)) .Select(i => DateTime.Now) .Publish() .RefCount(); while (true) { Console.WriteLine("Press return to subscribe"); Console.ReadLine(); using (var subscription = publishedSequence.Subscribe(value => Console.WriteLine("Observer: {0}", value))) { Console.WriteLine("Press return to unsubscribe"); Console.ReadLine(); } //now we disconnected from the published sequence }
Another significant difference between the Publish
/Connect
pattern and the RefCount
operator is that RefCount
subscribers will start receiving messages as soon as they subscribe, while the Publish
/Connect
subscribers must wait for the Connect
method to start receiving messages.
However, having a sequence without subscribers often is a performance improvement because the sequence consumes fewer resources.
If you want to simply publish the last value instead of the whole sequence, the choice is to use the PublishLast
factory method:
//the sourcing sequence var publishedSequence = Observable.Interval(TimeSpan.FromSeconds(0.5)) .Select(i => DateTime.Now) .Take(5) .PublishLast(); publishedSequence.Subscribe(value => Console.WriteLine("Last: {0}", value)); publishedSequence.Connect();
Keep in mind that the difference between such implementations and a classical Last
one is that, with PublishLast
, we make a single subscription to the sourcing sequence.
The Replay
factory method creates a single subscription sequence that will buffer messages of the sourcing sequence for the given buffer size. The replayed sequence will be identical to the sourcing sequence for message values and timings. This means that buffered messages will not flow altogether to the first subscriber. Instead, they will flow out as if they were just produced from the sourcing sequence. Here's an example:
//the sourcing sequence will fire //for 5 seconds var publishedSequence = Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => DateTime.Now) .Take(5) .Replay(10); //we wait for 2 seconds Thread.Sleep(2000); //now we connect the subscriber that will //recover all messages thanks to the replay behaviour publishedSequence.Subscribe(value => Console.WriteLine("Value: {0}", value)); publishedSequence.Connect(); Console.ReadLine();
In the preceding example, we are taking only 5
messages from the sourcing sequence by setting a replay buffer of 10
items, which is obviously to contain the whole sourcing sequence. In real-world applications, this buffer acts as a cache, allowing lazy subscribers to catch all the messages they want to reduce losses.
The Multicast
factory method is the father of all the previously seen methods that produces a single subscription against the sourcing sequence. All the other methods are built on the Multicast
one, returning Subject
or another to produce the right implementation.
This explains the internals of all the seen single subscription methods. They use Subject
to read and produce messages from the sourcing sequence to the target subscribers.
We can use such factory methods to create our own implementations. Here's an example:
//the sourcing sequence var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => DateTime.Now); //the subject that will route messages var multicastingSubject = new Subject<DateTime>(); //the publisher sequence var multicastSequence = sourcingSequence.Multicast(multicastingSubject); //subscribers multicastSequence.Subscribe(value => Console.WriteLine("Observer#1: {0}", value)); multicastSequence.Subscribe(value => Console.WriteLine("Observer#2: {0}", value)); //connect the publisher sequence multicastSequence.Connect(); Console.ReadLine();
This example shows an implementation similar to the classic Publish
factory method. The difference here is that this implementation gives the developer the chance of using a personally extended version of the subject, creating new publishing sequence implementations.
3.17.79.20