Advanced operators

Sometimes, we need functions to create message repetitions or to manipulate or reuse a subscription with multiple observers. This is the time when advanced operators come into play.

IgnoreElements

The IgnoreElements factory method creates a new sequence that will ignore any value message. Instead, errors and completion messages will normally flow out from the sourcing sequence.

This method is particularly useful to create multiple acknowledgements or simply to append some completion code to a sourcing sequence. Here's an example:

//the sourcing sequence of errors or completed messages 
var sourcingSequence = Observable.Throw<object>(new Exception("Test")); 
 
//a sequence able to handle only errors or completed messages 
var ignoredElements = sourcingSequence.IgnoreElements(); 
ignoredElements.Subscribe(new ConsoleObserver()); 

The ConsoleObserver class is the same as that of the previous examples. See the Interval or Create sections in this chapter.

Repeat

The Repeat factory method creates a sequence that will flow out sourcing sequence messages multiple (or infinite) times. Here's an example:

//a sourcing sequence of 5 elements 
var sourcingSequence = Observable.Range(1, 5); 
 
//the repeating sequence 
var repeatFor2Times = sourcingSequence.Repeat(2); 
repeatFor2Times.Subscribe(value => Console.WriteLine("Value: {0}", value)); 

Although the Repeat factory method is pretty simple to use, it is perfect to show the reader a possible unintended behavior. Anytime we subscribe a sequence, most of the premade factory methods restart their implementation. When dealing with simple integers, it is difficult to notice such behavior, but when dealing with the DateTime values, this is immediate. Here's an example:

//a sourcing sequence of 5 elements 
var sourcingSequence = Observable.Range(1, 5) 
    //slow down 
    .Select(i => { Thread.Sleep(1000); return i; }) 
    //take the actual time 
    .Select(i => DateTime.Now); 
 
//the repeating sequence 
var repeatFor2Times = sourcingSequence.Repeat(2); 
repeatFor2Times.Subscribe(value => Console.WriteLine("Value: {0}", value));  

In this example, we cannot use the Interval factory method, because this one lacks the completion message that is required for the Repeat method.

By using a DateTime value, you will immediately see that the Repeat sequence will subscribe two times the sourcing sequence that came from a Range factory method. This means that the integer values are valued two times in the Select transformation providing new DateTime values, instead of repeating the original ones. If this is an unintended behavior, we can use the Publish factory method just before.

Publish/Connect

The Publish method produces a single subscription sequence available to any other following subscriber. As it has already been said before, any subscription may cause the sourcing sequence to produce different values. Here's an example based on a random value:

var r = new Random(DateTime.Now.GetHashCode()); 
//a randomic value sequence 
var sourcingSequence = Observable.Range(1, 5) 
    //slow down 
    .Select(i => { Thread.Sleep(500); return i; }) 
    //take the actual time 
    .Select(i => r.Next()); 
 
//multiple subscriptions causing different 
//values being printed onto the console 
sourcingSequence.Subscribe(value => Console.WriteLine("Observer#1: {0}", value)); 
sourcingSequence.Subscribe(value => Console.WriteLine("Observer#2: {0}", value)); 
sourcingSequence.Subscribe(value => Console.WriteLine("Observer#3: {0}", value)); 

The preceding example will print to the console as follows:

Observer#1: 1387437772 
Observer#1: 1673597686 
Observer#1: 407780858 
Observer#1: 630401573 
Observer#1: 336086919 
Observer#2: 1071679403 
Observer#2: 302043112 
Observer#2: 1359704606 
Observer#2: 413086291 
Observer#2: 1357199039 
Observer#3: 1918565397 
Observer#3: 1660389991 
Observer#3: 1852413164 
Observer#3: 1520275706 
Observer#3: 1750871851 

As we can see, each subscription will start with a new ranged source sequence. Instead, the Publish method will create a single subscription sequence that will be available to multiple subscribers, routing the messages to all the observers. To start publishing messages, we need to turn on (and eventually off) the flowing by invoking the Connect method of the publishing sequence. Once we complete our task, we will dispose such connection flags. Here's another example:

//the sourcing sequence 
var publishedSequence = Observable.Interval(TimeSpan.FromSeconds(0.5)) 
    .Select(i => DateTime.Now) 
    .Publish(); 
 
//attach subscribers before connecting the publisher 
publishedSequence.Subscribe(value => Console.WriteLine("Observer#1: {0}", value)); 
publishedSequence.Subscribe(value => Console.WriteLine("Observer#2: {0}", value)); 
publishedSequence.Subscribe(value => Console.WriteLine("Observer#3: {0}", value)); 
 
while (true) 
{ 
    Console.WriteLine("Press RETURN to connect the published sequence"); 
    Console.ReadLine(); 
    using (var connected = publishedSequence.Connect()) 
    { 
        Console.WriteLine("Press RETURN to quit the connection"); 
        Console.ReadLine(); 
    } 
    //now we disconnected from the published sequence  
}  

Now, we have the same value flowing to all the subscribers, thus such subscribers receive their value, value by value, instead of having each subscriber wait for the previous subscriber to read the whole sequence as it happened in the previous example.

RefCount

Similar to the Publish/Connect operator pattern, RefCount returns a sequence of the published messages to its subscribers by subscribing only once to the sourcing sequence.

The difference is that RefCount automatically connects and disconnects from the published sequence while there is at least one subscriber. Once the last subscriber unsubscribes, the RefCount operator will automatically disconnect from the sourcing sequence. This is a powerful operator because it avoids the implementation of the Publish/Connect pattern by ourselves, and because this implementation avoids the occurrence of a race condition between the Publish and the Subscribe operators that in some cases may happen.

Here's an example:

//the sourcing sequence 
var publishedSequence = Observable.Interval(TimeSpan.FromSeconds(0.5)) 
    .Select(i => DateTime.Now) 
    .Publish() 
    .RefCount(); 
 
while (true) 
{ 
    Console.WriteLine("Press return to subscribe"); 
    Console.ReadLine(); 
    using (var subscription = publishedSequence.Subscribe(value =>
    Console.WriteLine("Observer: {0}", value))) 
    { 
        Console.WriteLine("Press return to unsubscribe"); 
        Console.ReadLine(); 
    } 
   //now we disconnected from the published sequence 
}  

Another significant difference between the Publish/Connect pattern and the RefCount operator is that RefCount subscribers will start receiving messages as soon as they subscribe, while the Publish/Connect subscribers must wait for the Connect method to start receiving messages.

However, having a sequence without subscribers often is a performance improvement because the sequence consumes fewer resources.

PublishLast

If you want to simply publish the last value instead of the whole sequence, the choice is to use the PublishLast factory method:

//the sourcing sequence 
var publishedSequence = Observable.Interval(TimeSpan.FromSeconds(0.5)) 
    .Select(i => DateTime.Now) 
    .Take(5) 
    .PublishLast(); 
 
publishedSequence.Subscribe(value => Console.WriteLine("Last: {0}", value)); 
publishedSequence.Connect(); 

Keep in mind that the difference between such implementations and a classical Last one is that, with PublishLast, we make a single subscription to the sourcing sequence.

Replay

The Replay factory method creates a single subscription sequence that will buffer messages of the sourcing sequence for the given buffer size. The replayed sequence will be identical to the sourcing sequence for message values and timings. This means that buffered messages will not flow altogether to the first subscriber. Instead, they will flow out as if they were just produced from the sourcing sequence. Here's an example:

//the sourcing sequence will fire 
//for 5 seconds 
var publishedSequence = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Select(i => DateTime.Now) 
    .Take(5) 
    .Replay(10); 
 
//we wait for 2 seconds 
Thread.Sleep(2000); 
 
//now we connect the subscriber that will 
//recover all messages thanks to the replay behaviour 
publishedSequence.Subscribe(value => Console.WriteLine("Value: {0}", value)); 
publishedSequence.Connect(); 
 
Console.ReadLine(); 

In the preceding example, we are taking only 5 messages from the sourcing sequence by setting a replay buffer of 10 items, which is obviously to contain the whole sourcing sequence. In real-world applications, this buffer acts as a cache, allowing lazy subscribers to catch all the messages they want to reduce losses.

Multicast

The Multicast factory method is the father of all the previously seen methods that produces a single subscription against the sourcing sequence. All the other methods are built on the Multicast one, returning Subject or another to produce the right implementation.

This explains the internals of all the seen single subscription methods. They use Subject to read and produce messages from the sourcing sequence to the target subscribers.

We can use such factory methods to create our own implementations. Here's an example:

//the sourcing sequence 
var sourcingSequence = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Select(i => DateTime.Now); 
 
//the subject that will route messages 
var multicastingSubject = new Subject<DateTime>(); 
             
//the publisher sequence 
var multicastSequence = sourcingSequence.Multicast(multicastingSubject); 
 
//subscribers 
multicastSequence.Subscribe(value => Console.WriteLine("Observer#1: {0}", value)); 
multicastSequence.Subscribe(value => Console.WriteLine("Observer#2: {0}", value)); 
 
//connect the publisher sequence 
multicastSequence.Connect(); 
 
Console.ReadLine(); 

This example shows an implementation similar to the classic Publish factory method. The difference here is that this implementation gives the developer the chance of using a personally extended version of the subject, creating new publishing sequence implementations.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.176.72