This chapter will give the reader a specific idea of sequence creation and manipulation. We will see the Reactive Extension framework's abilities to interact with sequences with the huge operator's availability through a lot of extension methods available to any observable sequence.
During this chapter, we will focus on the following arguments:
In the previous chapter, we saw how to create sequences other than simply implementing the IObservable
interface by creating our custom observables
using the Subject
class that gives us an initial implementation which is useful in a lot of cases, thus reducing the bootstrap time when programming in a reactive way.
We can create a new observable
subject simply with the new
keyword of the C# programming language. The same happens in regard to customized observable sequences (by implementing the IObservable
interface). Other than this, we can create generic observable sequences with the factory methods available within the observable
helper class that give us the ability to create sequences from the scratch without having to create custom classes, or by other values or other CLR objects, such as events and so on. These sequences are only observable sequences; they are not subjects and they are not observers.
A lot of the following factory methods will simply translate state-based (variables) data into flowing data by returning the same value wrapped into a sequence. Although this may seem like a change of minimal importance, the truth is that by changing the layout of data, we are also changing the kind of application design.
The simplest sequence is the empty sequence. This kind of sequence may be useful to start/end some kind of operations or may be useful to comply with some operator method signs.
Each of the two operators are available as factory methods from the Observable
helper. Each of these two operators (Empty
and Never
) produces a virtually empty sequence. The difference is that the Empty
factory method produces a sequence without values that inform of its emptiness by flowing a completed message. On the other hand, the Never
method produces a sequence that will either never end (infinite sequence) or never send any message:
//an empty sequence ended with a completed message IObservable<string> s2ended = Observable.Empty<string>(); //an infinite sequence IObservable<string> s2infinite = Observable.Never<string>();
When we need a sequence from an already available value, to bring such a value into the reactive world, we can simply use the
Return
factory method. This factory gives us the ability to go from the state-based application design to the reactive one:
//a sequence from a value IObservable<double> s3 = Observable.Return(40d);
Identical to the Return
factory, Throw
lets the error message flow to the underlying observers. It produces a new sequence with a single error message that originates from Exception
:
//a faulty sequence IObservable<DateTime> s4 = Observable.Throw<DateTime>(new Exception("Now"));
The Create
factory method is the most complex of the group. It does not simply change the type of the resulting object. It actually is a factory method that returns a factory for creating arbitrary sequences, such as message sources, to interact with observers.
We are not creating a self-messaging sequence. Instead, we will actually write a code that will produce messages per each observer ever attached to such a sequence. This is extremely different to any other sequence we have already seen (and even others in the next chapters), because all the sequences produce the same message that routes to all the observers, while the sequence born with the Create
factory method produces different messages based on any logic.
The Create
operator gives us the ability to specify a Func<IObserver<T>, IDisposable>
delegate that will contain the message sourcing logic. This delegated implementation will definitely produce messages (that is, by materializing a database query) that each subscriber will consume, only when the subscription occurs, in a lazy fashion. Bear in mind that each subscription will cause the execution of the delegate that will start flowing messages. The ability to have different implementations per subscriber makes this operator unique. Thus, only with the Create
operator, we have the ability to interact with the observer itself from within the sequence operator chain itself.
Before trying to understand such sequences in detail, let's take a look at an example:
var s5 = Observable.Create<DateTime>(observer => { Console.WriteLine("Registering subscriber {0} of type {1}", observer.GetHashCode(), observer); //here you can handle by hand your observer interaction logic Task.Factory.StartNew(() => { //some (time based) message for(int i=0;i<10;i++) { observer.OnNext(DateTime.Now); Thread.Sleep(1000); } //end of observer life observer.OnCompleted(); }); return () => Console.WriteLine("OnCompleted {0} of type {1}", observer.GetHashCode(), observer); }); //subscribe an anonymous observer s5.Subscribe(d => { Console.WriteLine("OnNext : {0}", d); });
The preceding example creates a sequence that accepts Action
that contains the executing code that represents the observable sequence logic for a given observer. Based on such a logic, different observers may even receive different messages.
In the example, we created Task
that will produce messages containing current DateTime
. At the end, a completed message reached the observer, killing it.
Instead of returning a simple Action
method representing the handler of the completed message that will reach the observer, we can produce a Disposable
object that is actually an end-of-life token. Once disposed, such tokens will kill the observer subscription even prematurely. The result is identical to the solution with the Action
already seen in the preceding code.
It is interesting see the ability to use the Disposable
helper module that gives us the opportunity to have empty disposable objects or disposable objects that execute some specific code once the disposing happens by specifying Action
. The previous example routes the written Action
method into the Disposable.Create
helper method, thus producing the same result of the following code with the difference being that, in the following example, we can stop prematurely the observer's life cycle:
var s5 = Observable.Create<DateTime>(observer => { Console.WriteLine("Registering subscriber {0} of type {1}", observer.GetHashCode(), observer); //here you can handle by hand your observer interaction logic Task.Factory.StartNew(() => { //some (time based) message for (int i = 0; i < 10; i++) { observer.OnNext(DateTime.Now); Thread.Sleep(1000); } //end of observer life observer.OnCompleted(); }); return Disposable.Create(() => Console.WriteLine("Disposing...")); }); //subscribe an anonymous observer var disposableObserver = s5.Subscribe(d => { Console.WriteLine("OnNext : {0}", d); }); //wait some time and press RETURN //to dispose the observer Console.ReadLine(); disposableObserver.Dispose(); Console.ReadLine();
The Range
factory method creates a sequence that will produce messages from a specific value range using the Int32
message type. This is something like creating a range of values by executing the Enumerable.Range()
method that is later flowed in an observable sequence. Here's an example:
//a ranged sequence var range = Observable.Range(0, 1000); //an observer will get values //anytime it will subscribe range.Subscribe(value => { Console.WriteLine("range -> {0}", value); });
The Range
operator creates something like a sourcing message pattern. This means that each subscribing observer will benefit by the same result. In other words, if we specify a range of 1000
items, each subscriber will receive 1000
messages. This also means that if there are no existing subscribers, nothing will happen and no message will flow to the consuming resources.
The Generate
factory method is some kind of aberration within the reactive world. We should deal with functions; instead, it is a For
statement made reactive.
In other words, we have an index value (i=0
), an evaluation statement (i<10
), an index iteration addition function, and a body function, exactly the same as any For
statement. Here's an example:
//a reactive For statement //similar to for(int i=0;i<10;i++) var generated = Observable.Generate<int, DateTime>(0, i => i < 10, i => i + 1, i => new DateTime(2016, 1, 1).AddDays(i)); generated.Subscribe(value => { Console.WriteLine("generated -> {0}", value); });
Although it is strange to see an iterative statement in the reactive world, sometimes, it may be useful.
18.227.72.15