A subject
class is an observable sequence that is an observer too. A subject may produce and consume values; in other words, it is a value publisher and a value subscriber.
In real-world reactive applications, there are various (some times hundreds) subjects interacting with each other.
In the previous section, when we saw the merge
operation in the marble diagram, we pressed two sequences into another one to give us the ability to subscribe an Observer
to the new merged sequence. This merged sequence is a subject because it receives values from the nonmerged sequences and then produces values to the related observer.
This is only an example of the infinite subjects available in any application.
A subject
class gives us the ability to create sequences or observers without having to always create a specific class and implement the relative interface, as seen in the previous examples with pure C# coding. Here is an example:
//a new sequence var s = new Subject<string>(); //subscribe such new observer OnNext implementation s.Subscribe(Console.WriteLine); //some push value s.OnNext("value1"); s.OnNext("value2"); Console.ReadLine();
The ability to create sequences with minimal effort is one of the killing features of a subject
.
As shown in the previous example, we do not need to implement any interface or create any custom class to achieve the desired result. The only required task is providing the OnNext
method implementation by executing the Subscribe
method. This method asks for Action<T>
. We can pass such a parameter by writing a lambda or by passing the method itself (as seen in the example).
The Subject
class itself implements the OnCompleted
method and the OnError
method by routing the proper message to all the subscribers and then disposing all the subscriber tokens.
When using Rx, we have lot of extension methods available to access the huge amount of operations available against subjects
. Later, in this chapter, we will give an overview of the most widely used ones.
Subject
is only the head of the dragon. A lot of other classes extends Subject
by adding other features.
ReplaySubject<T>
gives a time buffer feature.
Normally, when we use Subject
by pushing values into the subject, only already subscribed observers
will receive the new value from the first subject, as seen in the example:
var simpleSubject = new Subject<string>(); simpleSubject.OnNext("value1"); simpleSubject.OnNext("value2"); simpleSubject.Subscribe(Console.WriteLine); simpleSubject.OnNext("value3"); simpleSubject.OnNext("value4");
Instead, by using ReplaySubject
, the subject will route all the pushed messages to its subscribers like a normal subject does, adding the ability to store all such messages, making them available to later subscribers.
In this case, words are a bit more confusing than code:
var replaySubject = new ReplaySubject<string>(); replaySubject.OnNext("value1"); replaySubject.OnNext("value2"); replaySubject.Subscribe(Console.WriteLine); replaySubject.OnNext("value3"); replaySubject.OnNext("value4");
The difference between those two identical examples is that, in the simpleSubject
example at the beginning of the Subjects section, only value3
and value4
will be outputted to the console. In the example of the ReplaySubject section, all the values will reach the console output.
ReplaySubject
will buffer all the received messages in its memory and will later produce all such messages to the new subscriber.
Obviously, because the memory is finite, we have the ability to specify in the constructor a numeric buffer amount or a time-based amount with TimeSpan
:
var a = new ReplaySubject<string>(10); var b = new ReplaySubject<string>(TimeSpan.FromSeconds(10));
This alternative extension of the Subject
class is similar to ReplaySubject
with the difference that BehaviorSubject
always stores the latest available value (or the initial value if we're at the beginning of its life). This means that a default value must be provided through its constructor. Here is an example:
var behaviorSubject = new BehaviorSubject<DateTime>(new DateTime(2001, 1, 1)); Thread.Sleep(1000); //the default value will flow to the new subscriber behaviorSubject.Subscribe(x => Console.WriteLine(x)); Thread.Sleep(1000); //a new value will flow to the subscriber behaviorSubject.OnNext(DateTime.Now); Thread.Sleep(1000); //this new subscriber will receive the last available message //regardless is was not subscribing at the time the message arise behaviorSubject.Subscribe(x => Console.WriteLine(x)); Thread.Sleep(1000);
In any case, if observer
subscribes to subject
, it receives the last available value. However, when a subscriber already subscribes subject
when a new value is available, this value will normally flow to the subscriber.
This behavior is very specific in reactive programming because it allows us to ensure that a value will always flow to any subscriber regardless of the availability of new values.
In the real world, this subject is used in cases where we want to have the opportunity to subscribe, take some updated value, unsubscribe, and then be able to subscribe again in the future, thus repeating the behavior. This is a bit unreactive, but there are cases when this is useful.
Consider the case when you interface to a Programmable Logic Controller (PLC) to read analogic data. You may want the ability to stop collecting data and the ability to connect our subscribers again to the PLC reactive interface by receiving immediately the last value of all the wanted variables/analogic ports without having to wait until each PLC variable produces a new value to flow to our subscribers.
AsyncSubject
is another single message subject
. Although similar to BehaviorSubject
, it routes only the last message it receives, waiting for the OnComplete
message before routing its single OnNext
message:
var asyncSubject = new AsyncSubject<string>(); asyncSubject.OnNext("value1"); //this will be missed asyncSubject.Subscribe(Console.WriteLine); asyncSubject.OnNext("value2"); //this will be missed asyncSubject.OnNext("value3"); //this will be routed once OnCompleted raised Console.ReadLine(); asyncSubject.OnCompleted();
AsyncSubject
is very useful for cases when we need to work in a message-based way with the need of a message flow logic. The most used case is in combination to other sequences that produce a rolling average or a rolling total from a sequence of values. This subject gives us the ability to catch only the end result from the rolling one.
If out-of-the box subjects cannot satisfy our needs, we can create our custom subject
class by implementing the ISubject
interface. This interface offers the ability to create subjects
with the same type argument in receiving and sending messages, like in all the subject
classes seen in the previous examples. An interesting feature is the ability to create subjects
with different in/out type arguments. Here's an example:
public sealed class MapperSubject<Tin, Tout> : ISubject<Tin, Tout> { readonly Func<Tin, Tout> mapper; public MapperSubject(Func<Tin, Tout> mapper) { this.mapper = mapper; } public void OnCompleted() { foreach (var o in observers.ToArray()) { o.OnCompleted(); observers.Remove(o); } } public void OnError(Exception error) { foreach (var o in observers.ToArray()) { o.OnError(error); observers.Remove(o); } } public void OnNext(Tin value) { Tout newValue = default(Tout); try { //mapping statement newValue = mapper(value); } catch (Exception ex) { //if mapping crashed OnError(ex); return; } //if mapping succeded foreach (var o in observers) o.OnNext(newValue); } //all registered observers private readonly List<IObserver<Tout>> observers = new List<IObserver<Tout>>(); public IDisposable Subscribe(IObserver<Tout> observer) { observers.Add(observer); return new ObserverHandler<Tout>(observer, OnObserverLifecycleEnd); } private void OnObserverLifecycleEnd(IObserver<Tout> o) { o.OnCompleted(); observers.Remove(o); } //this class simply informs the subject that a dispose //has been invoked against the observer causing its removal //from the observer collection of the subject private class ObserverHandler<T> : IDisposable { private IObserver<T> observer; Action<IObserver<T>> onObserverLifecycleEnd; public ObserverHandler(IObserver<T> observer, Action<IObserver<T>> onObserverLifecycleEnd) { this.observer = observer; this.onObserverLifecycleEnd = onObserverLifecycleEnd; } public void Dispose() { onObserverLifecycleEnd(observer); } } }
Here's the usage:
var mapper = new MapperSubject<string, double>(x => double.Parse(x)); mapper.Subscribe(x => Console.WriteLine("{0:N4}", x)); mapper.OnNext("4.123"); mapper.OnNext("5.456"); mapper.OnNext("7.90'?"); mapper.OnNext("9.432");
This example shows how to create a mapping subject that translates messages into another type before routing all the data to the waiting observers.
The subject
class uses an ObserverHandler
class that handles the observer
life cycle by signaling the subject an eventual observerDispose
invoke by removing it from the observer
list.
An interesting feature available as the factory method from the Subject
class is the ability to create a new Subject
class using the given IObservable
and IObserver
couple. This gives us the ability to create mixed subjects from the external observable
/observer
objects or by reusing the IObservable
or the IObserver
part of other subject objects into a new subject
class.
var receiverSubject = new Subject<string>(); //the final observer implementation receiverSubject.Subscribe(x => Console.WriteLine("s1=>{0}", x)); //the source of all messages var senderSubject = new Subject<string>(); //no observers here //the router made with the Observer part of //the receiverSubject and the Observable part //of the senderSubject var routerSubject = Subject.Create(receiverSubject, senderSubject); //another observer for testing purposes routerSubject.Subscribe(x => Console.WriteLine("s3=>{0}", x)); senderSubject.OnNext("value1"); senderSubject.OnNext("value2");
This example shows us how to use the Subject.Create
factory method.
When creating a new subject by using Subject.Create
, the first parameter is IObserver
that will receive messages, and the second parameter is the IObservable
that will produce messages.
Although this example uses two subjects for creating routerSubject
(receiverSubject
and senderSubject
), any IObservable
and any IObserver
are valid in their place.
Transforming operators transform a message into another type or transform the sequence message order or flow.
A delay
operation adds some time delay to each message flowing within a sequence.
Here's an example:
var s3 = new Subject<string>(); var delay = s3.Delay(TimeSpan.FromSeconds(10)); delay.Subscribe(Console.WriteLine); s3.OnNext("value1"); s3.OnNext("value2");
The map
operator creates a new sequence that will flow messages from the sourcing sequence translated into another type. The translation may change the intrinsic value (as a mathematic operation similar to the example shown in the following screenshot) or may be a type transformation:
Here's an example:
var s4 = new Subject<string>(); //a numeric sequence var map = s4.Select(x => double.Parse(x)); map.Subscribe(x => Console.WriteLine("{0:N4}", x)); s4.OnNext("10.40"); s4.OnNext("12.55");
A scan
operation works by applying a transformation to each message within a sequence with the ability to interact with the last transformed value. The following example shows how to create a running total, as the one available within Microsoft Excel, to have a real-time invoice total amount.
During the first execution, the transformation Func
is not executed.
Here's an example:
var invoiceSummarySubject = new Subject<double>(); var invoiceSummaryScanSubject = invoiceSummarySubject.Scan((last, x) => x + last); //register an observer for printing total amount invoiceSummaryScanSubject.Subscribe(new Action<double>(x => Console.WriteLine("Total amount: {0:C}", x))); //register some invoice item total invoiceSummarySubject.OnNext(1250.50); //add a notebook invoiceSummarySubject.OnNext(-50.0); //discount invoiceSummarySubject.OnNext(44.98); //a notebook bag
A Debounce
operator avoids messages from flowing in at a higher rate by setting a time-based throttling between messages. This means that messages cannot flow at a rate higher than the set one. In Rx, the operator name is Throttle, because it slows down the message flow when too many messages flow altogether in a short period.
Real world uses are the need to slow down useless high rate notifications that have to reach a UX, where anything other than 30 fps is only a resource waste, or when we make an analog-to-digital parsing that we don't want to exceed a parsing rate per second.
When we use the Throttle
extension method, we must specify the minimum time from the last message so that the new sequence will wait to ensure the last message has flown out.
The Debounce
operator adds a time delay and regulates the flow of messages when the source observable is still producing a lot of messages:
Here's an example:
var s5 = new Subject<DateTime>(); var throttle = s5.Throttle(TimeSpan.FromMilliseconds(500)); throttle.Subscribe(x => Console.WriteLine("{0:T}", x)); //produce 100 messages for (int i = 0; i < 100; i++) s5.OnNext(DateTime.Now);
The Amb
operator will produce a new sequence that will flow messages from the fastest to produce the first message between the sourcing sequences. This means that the first sequence that flows a message into the Amb
operator's sequence will became the only sourcing sequence of the Amb
operator regardless of whether this sequence later completes prematurely while other initial sourcing sequences are still alive. This kind of operator is great when we need the Speculative Execution logic with reactive sequences. With this logic design, we can start or simply wait until multiple functions doing the same task with different parameters (think of a web search with the user's raw search criteria, or similar words from an anagram dictionary) execute, usually taking the result only from the fastest to results.
It is a routing sequence that accepts multiple source sequences (of the same message type), as other routing sequences, the Amb
simply routes messages without applying any transformation. It simply chooses a single sourcing sequence between all those available by selecting the fastest one to produce any message:
Here's an example:
var s20 = new Subject<string>(); var s21 = new Subject<string>(); var amb = s20.Amb(s21); amb.Subscribe(Console.WriteLine); //the first message will let amb operator //choose the definite source sequence s21.OnNext("value1"); //messages from the other sequences are ignored s20.OnNext("value2");
18.118.2.240