Chapter 4. Observable Sequence Programming

This chapter will give the reader a specific idea of sequence creation and manipulation. We will see the Reactive Extension framework's abilities to interact with sequences with the huge operator's availability through a lot of extension methods available to any observable sequence.

During this chapter, we will focus on the following arguments:

  • Sequence creation basics
  • Time-based sequence creation
  • Sequence manipulation and filtering
  • Sequence partitioning
  • Advanced operators

Sequence creation basics

In the previous chapter, we saw how to create sequences other than simply implementing the IObservable interface by creating our custom observables using the Subject class that gives us an initial implementation which is useful in a lot of cases, thus reducing the bootstrap time when programming in a reactive way.

We can create a new observable subject simply with the new keyword of the C# programming language. The same happens in regard to customized observable sequences (by implementing the IObservable interface). Other than this, we can create generic observable sequences with the factory methods available within the observable helper class that give us the ability to create sequences from the scratch without having to create custom classes, or by other values or other CLR objects, such as events and so on. These sequences are only observable sequences; they are not subjects and they are not observers.

A lot of the following factory methods will simply translate state-based (variables) data into flowing data by returning the same value wrapped into a sequence. Although this may seem like a change of minimal importance, the truth is that by changing the layout of data, we are also changing the kind of application design.

Empty/Never

The simplest sequence is the empty sequence. This kind of sequence may be useful to start/end some kind of operations or may be useful to comply with some operator method signs.

Each of the two operators are available as factory methods from the Observable helper. Each of these two operators (Empty and Never) produces a virtually empty sequence. The difference is that the Empty factory method produces a sequence without values that inform of its emptiness by flowing a completed message. On the other hand, the Never method produces a sequence that will either never end (infinite sequence) or never send any message:

//an empty sequence ended with a completed message 
IObservable<string> s2ended = Observable.Empty<string>(); 
//an infinite sequence 
IObservable<string> s2infinite = Observable.Never<string>(); 

Return

When we need a sequence from an already available value, to bring such a value into the reactive world, we can simply use the Return factory method. This factory gives us the ability to go from the state-based application design to the reactive one:

//a sequence from a value 
IObservable<double> s3 = Observable.Return(40d); 

Throw

Identical to the Return factory, Throw lets the error message flow to the underlying observers. It produces a new sequence with a single error message that originates from Exception:

//a faulty sequence 
IObservable<DateTime> s4 = Observable.Throw<DateTime>(new Exception("Now")); 

Create

The Create factory method is the most complex of the group. It does not simply change the type of the resulting object. It actually is a factory method that returns a factory for creating arbitrary sequences, such as message sources, to interact with observers.

We are not creating a self-messaging sequence. Instead, we will actually write a code that will produce messages per each observer ever attached to such a sequence. This is extremely different to any other sequence we have already seen (and even others in the next chapters), because all the sequences produce the same message that routes to all the observers, while the sequence born with the Create factory method produces different messages based on any logic.

The Create operator gives us the ability to specify a Func<IObserver<T>, IDisposable> delegate that will contain the message sourcing logic. This delegated implementation will definitely produce messages (that is, by materializing a database query) that each subscriber will consume, only when the subscription occurs, in a lazy fashion. Bear in mind that each subscription will cause the execution of the delegate that will start flowing messages. The ability to have different implementations per subscriber makes this operator unique. Thus, only with the Create operator, we have the ability to interact with the observer itself from within the sequence operator chain itself.

Before trying to understand such sequences in detail, let's take a look at an example:

var s5 = Observable.Create<DateTime>(observer => 
    { 
        Console.WriteLine("Registering subscriber {0} of type {1}",
        observer.GetHashCode(), observer); 
        //here you can handle by hand your observer interaction logic 
        Task.Factory.StartNew(() => 
            { 
                //some (time based) message 
                for(int i=0;i<10;i++) 
                { 
                    observer.OnNext(DateTime.Now); 
                    Thread.Sleep(1000); 
                } 
                //end of observer life 
                observer.OnCompleted(); 
            }); 
        return () => Console.WriteLine("OnCompleted {0} of type {1}",
        observer.GetHashCode(), observer); 
    }); 
 
//subscribe an anonymous observer 
s5.Subscribe(d => 
    { 
        Console.WriteLine("OnNext : {0}", d); 
    }); 

The preceding example creates a sequence that accepts Action that contains the executing code that represents the observable sequence logic for a given observer. Based on such a logic, different observers may even receive different messages.

In the example, we created Task that will produce messages containing current DateTime. At the end, a completed message reached the observer, killing it.

Instead of returning a simple Action method representing the handler of the completed message that will reach the observer, we can produce a Disposable object that is actually an end-of-life token. Once disposed, such tokens will kill the observer subscription even prematurely. The result is identical to the solution with the Action already seen in the preceding code.

It is interesting see the ability to use the Disposable helper module that gives us the opportunity to have empty disposable objects or disposable objects that execute some specific code once the disposing happens by specifying Action. The previous example routes the written Action method into the Disposable.Create helper method, thus producing the same result of the following code with the difference being that, in the following example, we can stop prematurely the observer's life cycle:

var s5 = Observable.Create<DateTime>(observer => 
    { 
        Console.WriteLine("Registering subscriber {0} of type {1}",
        observer.GetHashCode(), observer); 
        //here you can handle by hand your observer interaction logic 
        Task.Factory.StartNew(() => 
            { 
                //some (time based) message 
                for (int i = 0; i < 10; i++) 
                { 
                    observer.OnNext(DateTime.Now); 
                    Thread.Sleep(1000); 
                } 
                //end of observer life 
                observer.OnCompleted(); 
            }); 
        return Disposable.Create(() => Console.WriteLine("Disposing...")); 
    }); 
 
//subscribe an anonymous observer 
var disposableObserver = s5.Subscribe(d => 
    { 
        Console.WriteLine("OnNext : {0}", d); 
    }); 
 
//wait some time and press RETURN 
//to dispose the observer 
Console.ReadLine(); 
 
disposableObserver.Dispose(); 
 
Console.ReadLine(); 

Range

The Range factory method creates a sequence that will produce messages from a specific value range using the Int32 message type. This is something like creating a range of values by executing the Enumerable.Range() method that is later flowed in an observable sequence. Here's an example:

//a ranged sequence 
var range = Observable.Range(0, 1000); 
//an observer will get values 
//anytime it will subscribe 
range.Subscribe(value => 
    { 
        Console.WriteLine("range -> {0}", value); 
    }); 

The Range operator creates something like a sourcing message pattern. This means that each subscribing observer will benefit by the same result. In other words, if we specify a range of 1000 items, each subscriber will receive 1000 messages. This also means that if there are no existing subscribers, nothing will happen and no message will flow to the consuming resources.

Generate

The Generate factory method is some kind of aberration within the reactive world. We should deal with functions; instead, it is a For statement made reactive.

In other words, we have an index value (i=0), an evaluation statement (i<10), an index iteration addition function, and a body function, exactly the same as any For statement. Here's an example:

//a reactive For statement 
//similar to for(int i=0;i<10;i++) 
var generated = Observable.Generate<int, DateTime>(0, i => i < 10, i => i + 1, i => new DateTime(2016, 1, 1).AddDays(i)); 
 
generated.Subscribe(value => 
    { 
        Console.WriteLine("generated -> {0}", value); 
    }); 

Although it is strange to see an iterative statement in the reactive world, sometimes, it may be useful.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.72.15