Subjects

A subject class is an observable sequence that is an observer too. A subject may produce and consume values; in other words, it is a value publisher and a value subscriber.

In real-world reactive applications, there are various (some times hundreds) subjects interacting with each other.

In the previous section, when we saw the merge operation in the marble diagram, we pressed two sequences into another one to give us the ability to subscribe an Observer to the new merged sequence. This merged sequence is a subject because it receives values from the nonmerged sequences and then produces values to the related observer.

This is only an example of the infinite subjects available in any application.

A subject class gives us the ability to create sequences or observers without having to always create a specific class and implement the relative interface, as seen in the previous examples with pure C# coding. Here is an example:

//a new sequence 
var s = new Subject<string>(); 
//subscribe such new observer OnNext implementation 
s.Subscribe(Console.WriteLine); 
             
//some push value 
s.OnNext("value1"); 
s.OnNext("value2"); 
Console.ReadLine(); 

The ability to create sequences with minimal effort is one of the killing features of a subject.

As shown in the previous example, we do not need to implement any interface or create any custom class to achieve the desired result. The only required task is providing the OnNext method implementation by executing the Subscribe method. This method asks for Action<T>. We can pass such a parameter by writing a lambda or by passing the method itself (as seen in the example).

The Subject class itself implements the OnCompleted method and the OnError method by routing the proper message to all the subscribers and then disposing all the subscriber tokens.

When using Rx, we have lot of extension methods available to access the huge amount of operations available against subjects. Later, in this chapter, we will give an overview of the most widely used ones.

Subject is only the head of the dragon. A lot of other classes extends Subject by adding other features.

ReplaySubject

ReplaySubject<T> gives a time buffer feature.

Normally, when we use Subject by pushing values into the subject, only already subscribed observers will receive the new value from the first subject, as seen in the example:

var simpleSubject = new Subject<string>(); 
simpleSubject.OnNext("value1"); 
simpleSubject.OnNext("value2"); 
simpleSubject.Subscribe(Console.WriteLine); 
simpleSubject.OnNext("value3"); 
simpleSubject.OnNext("value4"); 

Instead, by using ReplaySubject, the subject will route all the pushed messages to its subscribers like a normal subject does, adding the ability to store all such messages, making them available to later subscribers.

In this case, words are a bit more confusing than code:

var replaySubject = new ReplaySubject<string>(); 
replaySubject.OnNext("value1"); 
replaySubject.OnNext("value2"); 
replaySubject.Subscribe(Console.WriteLine); 
replaySubject.OnNext("value3"); 
replaySubject.OnNext("value4"); 

The difference between those two identical examples is that, in the simpleSubject example at the beginning of the Subjects section, only value3 and value4 will be outputted to the console. In the example of the ReplaySubject section, all the values will reach the console output.

ReplaySubject will buffer all the received messages in its memory and will later produce all such messages to the new subscriber.

Obviously, because the memory is finite, we have the ability to specify in the constructor a numeric buffer amount or a time-based amount with TimeSpan:

var a = new ReplaySubject<string>(10); 
var b = new ReplaySubject<string>(TimeSpan.FromSeconds(10)); 

BehaviorSubject

This alternative extension of the Subject class is similar to ReplaySubject with the difference that BehaviorSubject always stores the latest available value (or the initial value if we're at the beginning of its life). This means that a default value must be provided through its constructor. Here is an example:

var behaviorSubject = new BehaviorSubject<DateTime>(new DateTime(2001, 1, 1)); 
Thread.Sleep(1000); 
//the default value will flow to the new subscriber 
behaviorSubject.Subscribe(x => Console.WriteLine(x)); 
Thread.Sleep(1000); 
//a new value will flow to the subscriber 
behaviorSubject.OnNext(DateTime.Now); 
Thread.Sleep(1000); 
//this new subscriber will receive the last available message 
//regardless is was not subscribing at the time the message arise 
behaviorSubject.Subscribe(x => Console.WriteLine(x)); 
Thread.Sleep(1000); 

In any case, if observer subscribes to subject, it receives the last available value. However, when a subscriber already subscribes subject when a new value is available, this value will normally flow to the subscriber.

This behavior is very specific in reactive programming because it allows us to ensure that a value will always flow to any subscriber regardless of the availability of new values.

In the real world, this subject is used in cases where we want to have the opportunity to subscribe, take some updated value, unsubscribe, and then be able to subscribe again in the future, thus repeating the behavior. This is a bit unreactive, but there are cases when this is useful.

Consider the case when you interface to a Programmable Logic Controller (PLC) to read analogic data. You may want the ability to stop collecting data and the ability to connect our subscribers again to the PLC reactive interface by receiving immediately the last value of all the wanted variables/analogic ports without having to wait until each PLC variable produces a new value to flow to our subscribers.

AsyncSubject

AsyncSubject is another single message subject. Although similar to BehaviorSubject, it routes only the last message it receives, waiting for the OnComplete message before routing its single OnNext message:

var asyncSubject = new AsyncSubject<string>(); 
asyncSubject.OnNext("value1"); //this will be missed 
asyncSubject.Subscribe(Console.WriteLine); 
asyncSubject.OnNext("value2"); //this will be missed 
asyncSubject.OnNext("value3"); //this will be routed once OnCompleted raised 
Console.ReadLine(); 
asyncSubject.OnCompleted(); 

AsyncSubject is very useful for cases when we need to work in a message-based way with the need of a message flow logic. The most used case is in combination to other sequences that produce a rolling average or a rolling total from a sequence of values. This subject gives us the ability to catch only the end result from the rolling one.

Custom subjects

If out-of-the box subjects cannot satisfy our needs, we can create our custom subject class by implementing the ISubject interface. This interface offers the ability to create subjects with the same type argument in receiving and sending messages, like in all the subject classes seen in the previous examples. An interesting feature is the ability to create subjects with different in/out type arguments. Here's an example:

public sealed class MapperSubject<Tin, Tout> : ISubject<Tin, Tout> 
{ 
    readonly Func<Tin, Tout> mapper; 
    public MapperSubject(Func<Tin, Tout> mapper) 
    { 
        this.mapper = mapper; 
    } 
 
    public void OnCompleted() 
    { 
        foreach (var o in observers.ToArray()) 
        { 
            o.OnCompleted(); 
            observers.Remove(o); 
        } 
    } 
 
    public void OnError(Exception error) 
    { 
        foreach (var o in observers.ToArray()) 
        { 
            o.OnError(error); 
            observers.Remove(o); 
        } 
    } 
 
    public void OnNext(Tin value) 
    { 
        Tout newValue = default(Tout); 
        try 
        { 
            //mapping statement 
            newValue = mapper(value); 
        } 
        catch (Exception ex) 
        { 
            //if mapping crashed 
            OnError(ex); 
            return; 
        } 
 
        //if mapping succeded 
        foreach (var o in observers) 
            o.OnNext(newValue); 
    } 
 
    //all registered observers 
    private readonly List<IObserver<Tout>> observers = new
    List<IObserver<Tout>>(); 
    public IDisposable Subscribe(IObserver<Tout> observer) 
    { 
        observers.Add(observer); 
        return new ObserverHandler<Tout>(observer, OnObserverLifecycleEnd); 
    } 
 
    private void OnObserverLifecycleEnd(IObserver<Tout> o) 
    { 
        o.OnCompleted(); 
        observers.Remove(o); 
    } 
 
    //this class simply informs the subject that a dispose 
    //has been invoked against the observer causing its removal 
    //from the observer collection of the subject 
    private class ObserverHandler<T> : IDisposable 
    { 
        private IObserver<T> observer; 
        Action<IObserver<T>> onObserverLifecycleEnd; 
        public ObserverHandler(IObserver<T> observer, Action<IObserver<T>>
        onObserverLifecycleEnd) 
        { 
            this.observer = observer; 
            this.onObserverLifecycleEnd = onObserverLifecycleEnd; 
        } 
 
        public void Dispose() 
        { 
            onObserverLifecycleEnd(observer); 
        } 
    } 
} 

Here's the usage:

var mapper = new MapperSubject<string, double>(x => double.Parse(x)); 
mapper.Subscribe(x => Console.WriteLine("{0:N4}", x)); 
mapper.OnNext("4.123"); 
mapper.OnNext("5.456"); 
mapper.OnNext("7.90'?"); 
mapper.OnNext("9.432"); 

This example shows how to create a mapping subject that translates messages into another type before routing all the data to the waiting observers.

The subject class uses an ObserverHandler class that handles the observer life cycle by signaling the subject an eventual observerDispose invoke by removing it from the observer list.

Subject from IObservable/IObserver

An interesting feature available as the factory method from the Subject class is the ability to create a new Subject class using the given IObservable and IObserver couple. This gives us the ability to create mixed subjects from the external observable/observer objects or by reusing the IObservable or the IObserver part of other subject objects into a new subject class.

Note

Kindly bear in mind that the IObserver implementation of subject is simply the message routing to its underlying observers. Here is an example:

var receiverSubject = new Subject<string>(); 
//the final observer implementation 
receiverSubject.Subscribe(x => Console.WriteLine("s1=>{0}", x)); 
 
//the source of all messages 
var senderSubject = new Subject<string>(); 
//no observers here 
 
//the router made with the Observer part of 
//the receiverSubject and the Observable part 
//of the senderSubject 
var routerSubject = Subject.Create(receiverSubject, senderSubject); 
//another observer for testing purposes 
routerSubject.Subscribe(x => Console.WriteLine("s3=>{0}", x)); 
 
senderSubject.OnNext("value1"); 
senderSubject.OnNext("value2"); 
 

This example shows us how to use the Subject.Create factory method.

When creating a new subject by using Subject.Create, the first parameter is IObserver that will receive messages, and the second parameter is the IObservable that will produce messages.

Although this example uses two subjects for creating routerSubject (receiverSubject and senderSubject), any IObservable and any IObserver are valid in their place.

Tip

All the operators are usually available as extension methods by including the System.Reactive.Linq namespace in all the IObservables classes.

Transforming operators

Transforming operators transform a message into another type or transform the sequence message order or flow.

Delay

A delay operation adds some time delay to each message flowing within a sequence.

Delay

A marble diagram showing a delay operation

Here's an example:

var s3 = new Subject<string>(); 
var delay = s3.Delay(TimeSpan.FromSeconds(10)); 
delay.Subscribe(Console.WriteLine); 
 
s3.OnNext("value1"); 
s3.OnNext("value2"); 

Map

The map operator creates a new sequence that will flow messages from the sourcing sequence translated into another type. The translation may change the intrinsic value (as a mathematic operation similar to the example shown in the following screenshot) or may be a type transformation:

Map

A marble diagram showing a map operation

Here's an example:

var s4 = new Subject<string>(); 
//a numeric sequence 
var map = s4.Select(x => double.Parse(x)); 
map.Subscribe(x => Console.WriteLine("{0:N4}", x)); 
s4.OnNext("10.40"); 
s4.OnNext("12.55"); 

Scan

A scan operation works by applying a transformation to each message within a sequence with the ability to interact with the last transformed value. The following example shows how to create a running total, as the one available within Microsoft Excel, to have a real-time invoice total amount.

During the first execution, the transformation Func is not executed.

Scan

A marble diagram showing a scan operation

Here's an example:

var invoiceSummarySubject = new Subject<double>(); 
var invoiceSummaryScanSubject = invoiceSummarySubject.Scan((last, x) => x + last); 
//register an observer for printing total amount 
invoiceSummaryScanSubject.Subscribe(new Action<double>(x => Console.WriteLine("Total amount: {0:C}", x))); 
//register some invoice item total 
invoiceSummarySubject.OnNext(1250.50); //add a notebook 
invoiceSummarySubject.OnNext(-50.0); //discount 
invoiceSummarySubject.OnNext(44.98); //a notebook bag 

Tip

Scan produces a sequence containing a running total; alternative if the ending total is your only interest, you can use the Aggregate extension method.

Debounce

A Debounce operator avoids messages from flowing in at a higher rate by setting a time-based throttling between messages. This means that messages cannot flow at a rate higher than the set one. In Rx, the operator name is Throttle, because it slows down the message flow when too many messages flow altogether in a short period.

Real world uses are the need to slow down useless high rate notifications that have to reach a UX, where anything other than 30 fps is only a resource waste, or when we make an analog-to-digital parsing that we don't want to exceed a parsing rate per second.

When we use the Throttle extension method, we must specify the minimum time from the last message so that the new sequence will wait to ensure the last message has flown out.

The Debounce operator adds a time delay and regulates the flow of messages when the source observable is still producing a lot of messages:

Debounce

A marble diagram showing a debounce operation

Here's an example:

var s5 = new Subject<DateTime>(); 
var throttle = s5.Throttle(TimeSpan.FromMilliseconds(500)); 
throttle.Subscribe(x => Console.WriteLine("{0:T}", x)); 
 
//produce 100 messages 
for (int i = 0; i < 100; i++) 
    s5.OnNext(DateTime.Now); 

Amb

The Amb operator will produce a new sequence that will flow messages from the fastest to produce the first message between the sourcing sequences. This means that the first sequence that flows a message into the Amb operator's sequence will became the only sourcing sequence of the Amb operator regardless of whether this sequence later completes prematurely while other initial sourcing sequences are still alive. This kind of operator is great when we need the Speculative Execution logic with reactive sequences. With this logic design, we can start or simply wait until multiple functions doing the same task with different parameters (think of a web search with the user's raw search criteria, or similar words from an anagram dictionary) execute, usually taking the result only from the fastest to results.

It is a routing sequence that accepts multiple source sequences (of the same message type), as other routing sequences, the Amb simply routes messages without applying any transformation. It simply chooses a single sourcing sequence between all those available by selecting the fastest one to produce any message:

Amb

A marble diagram showing an amb operation

Here's an example:

var s20 = new Subject<string>(); 
var s21 = new Subject<string>(); 
var amb = s20.Amb(s21); 
amb.Subscribe(Console.WriteLine); 
 
//the first message will let amb operator 
//choose the definite source sequence 
 
s21.OnNext("value1"); 
//messages from the other sequences are ignored 
s20.OnNext("value2"); 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.247.125