Time-based sequence creation

In the previous sections, we had the opportunity to create simple sequences from known values or by executing some specific code.

Although these opportunities give us the chance to create useful message sequences, in the real world, reactive programming deals with some kind of time-based messages.

Interval

The easiest form of time-based interaction is the polling design. This design, typical of nonreactive programming, happens anytime we ask for a value or a state at a fixed time interval. Similarly, within the reactive programming, we may produce messages at a fixed time interval running in a push design to use the value itself or to trigger other logics available in the following sequence chain.

The Interval operator produces a similar design by specifying the wanted time interval, receiving a counter of the current tick as the Int64 value.

Here's an example:

static void Main(string[] args) 
{ 
    //this sequence produces a message per second 
    var sequence = Observable.Interval(TimeSpan.FromSeconds(1)); 
    sequence.Subscribe(ObserverOnNext); 
 
    Console.ReadLine(); 
} 
 
private static void ObserverOnNext(long obj) 
{ 
    Console.WriteLine("{0} -> {1}", obj, DateTime.Now); 
} 

The greatest benefit here is the asynchronous implementation of the sequence that came from the Observable.Interval helper method at actually no cost for the developer.

Instead of creating threads or tasks, we simply asked for a sequence to produce messages at a timely basis and that is what we have.

The sequence always produces the Int64 values starting from zero. This is a message counter. Useful for some hardcoded, antireactive solution/design, the suggestion is to ignore such values.

Timer

The Timer factory method adds another feature to the Interval one. Like Interval, Timer may give us the ability to produce messages at a fixed time interval, but the real use of the Timer method is the ability to defer the execution at a given time or mix two features together. Here's an example:

//a timer used for defer the message sending 
var defer = Observable.Timer(TimeSpan.FromSeconds(5)); 
defer.Subscribe(value => 
    { 
        Console.WriteLine("defer -> {0}", value); 
    }); 
 
//a polling timer that will produce 
//messages at fixed time interval 
var loop = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(0.5)); 
loop.Subscribe(value => 
{ 
    Console.WriteLine("loop -> {0}", value); 
}); 
 
Console.ReadLine(); 

Similar to the Interval factory method, the Timer produces a sequence to work in an asynchronous way without having the developer doing the hard job of creating threads or tasks.

Timeout

The Timeout factory method creates a new sequence to throw TimeoutException if the system time exceeds a specific time (absolute timeout) or if a message flows by a time that exceeds a given time by the previous message (relative timeout).

This powerful sequence can help in addressing a lot of network-related needs or multiple message correlations based on specific synchronization times.

Here's a complete example of the two working modes:

class Program 
{ 
    static void Main(string[] args) 
    { 
        //this sequence must complete by 5 seconds from now 
        var absoluteTimeoutSequence = Observable.Interval(TimeSpan.FromSeconds(1)) 
            .Select(id => DateTime.UtcNow) 
            .Timeout(DateTimeOffset.Now.AddSeconds(5)); 
 
        absoluteTimeoutSequence.Subscribe(new ConsoleObserver()); 
 
        Console.WriteLine("Press RETURN to start the following example"); 
        Console.ReadLine(); 
 
        //this sequence's messages must flow 
        //by 2 seconds 
        var relativeTimeoutSequence = Observable.Create<DateTime>(newObserver => 
            { 
                Console.WriteLine("Registering observer..."); 
                Console.WriteLine("Starting message flow..."); 
 
                //handle the new subscriber message flow 
                Task.Factory.StartNew(() => 
                { 
                    //the message flow will slow down until timeout 
                    int i = 100; 
                    while (true) 
                    { 
                        newObserver.OnNext(DateTime.UtcNow); 
                        //the delay will increase each iteration 
                        Thread.Sleep(i += 100); 
                    } 
                }, TaskCreationOptions.PreferFairness); 
 
                return new Action(() => 
                { 
                    Console.WriteLine("Completed"); 
                }); 
            }) 
            .Timeout(TimeSpan.FromSeconds(2)); 
 
        relativeTimeoutSequence.Subscribe(new ConsoleObserver()); 
 
        Console.ReadLine(); 
    } 
} 
 
public class ConsoleObserver : IObserver<DateTime> 
{ 
    public void OnCompleted() 
    { 
        Console.WriteLine("Observer completed!"); 
    } 
 
    public void OnError(Exception error) 
    { 
        Console.WriteLine("Observer error: {0}", error); 
    } 
 
    public void OnNext(DateTime value) 
    { 
        Console.WriteLine("{0}", value); 
    } 
} 

TimeInterval/Timestamp

The TimeInterval factory method is a very useful factory method that creates a sequence to record the time interval that exists between messages flowing from a sourcing sequence. It is absolutely useful for diagnostic purposes.

Similarly useful is the Timestamp factory method that creates a sequence that flows out messages with a timestamp (DateTimeOffset) which is useful for tracing/logging message flows.

Here's a complete example of a fixed time interval sourcing sequence:

class Program 
{ 
    static void Main(string[] args) 
    { 
        //a sourcing sequence 
        var sourcingSequence = 
         Observable.Interval(TimeSpan.FromSeconds(1)).Select(id => 
         DateTime.UtcNow); 
         sourcingSequence.Subscribe(value => 
            { 
                Console.WriteLine("{0}", value); 
            }); 
 
        //a sequence recording the time interval of the sourcing sequence 
        var diagnosticSequence = sourcingSequence.TimeInterval(); 
        diagnosticSequence.Subscribe(interval => 
            { 
                Debug.WriteLine(string.Format("Message flowing in {0:N0}ms",
                interval.Interval.TotalMilliseconds)); 
            }); 
 
        var diagnosticSequence2 = sourcingSequence.Timestamp(); 
        diagnosticSequence2.Subscribe(new MessageTimeStampLogger()); 
 
        Console.ReadLine(); 
    } 
} 
 
public class MessageTimeStampLogger : IObserver<Timestamped<DateTime>> 
{ 
    public void OnCompleted() 
    { 
        Console.WriteLine("Observer completed!"); 
    } 
 
    public void OnError(Exception error) 
    { 
        Console.WriteLine("Observer error: {0}", error); 
    } 
 
    public void OnNext(Timestamped<DateTime> value) 
    { 
        Debug.WriteLine(string.Format("{0} -> Now flowing: {1}", value.Timestamp, value.Value)); 
    } 
} 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.237.201