Scheduling

Programming observable sequences is a powerful way to achieve a highly modularized programming experience. Although this offers high flexibility, an observable sequence is single-threaded by default and is similar to a lot of other CLR objects. This means that although a sequence can push messages to multiple subscribers, this operation happens in the same thread where the messages originate and then the messages reach all the subscribers, sequentially following their subscription order.

This means that using an observable sequence instead of any other .NET object does not convert automatically our code into a multithreaded one.

Luckily, to address this automatic multithreading need in the Rx world, there are Schedulers. These are objects that choose when a message can flow and which thread must handle a message.

Tip

It is important to understand the huge difference that exists between using a CLR delegate/event and Rx scheduling. When using scheduling in Rx, we would always have an asynchronous implementation in an in-memory queue of messages that act as a message pump supporting Quality of Service (QoS) for routing messages at different speeds. These messages flow and are observed in a thread or another based on the scheduler's choice. This is actually the most critical task of a scheduler.

In short, a scheduler is in charge of choosing the message/thread association. Let's take a look at this familiar code:

var loopBasedSequence = Observable.Create<DateTime>(x => 
{ 
    while (true) 
    { 
        Console.WriteLine("{0} -> Yielding new value...", 
        Thread.CurrentThread.ManagedThreadId); 
        x.OnNext(DateTime.Now); 
        Thread.Sleep(1000); 
    } 
    return Disposable.Empty; 
}); 
 
loopBasedSequence.Subscribe(x => Console.WriteLine("-> {0}", x)); 

The preceding example shows us a very simple implementation of an infinite sequence. Based on a simple infinite loop, this example produces messages that originate always from the same thread. This is obvious because we are looping within a While loop; the same behavior is available in nonreactive coding.

Let's take a look at another example that shows a timer-based sequence:

var timerBasedSequence = Observable.Interval(TimeSpan.FromSeconds(1)) 
.Select(x => 
{ 
    Console.WriteLine("{0} -> Yielding new value...", 
    Thread.CurrentThread.ManagedThreadId); 
    return DateTime.Now; 
}); 
 
timerBasedSequence.Subscribe(x => Console.WriteLine("-> {0}", x)); 

This example shows another infinite sequence that is slightly different from the previous one. This sequence is timer-based, such as when we use System.Threading.Timer in classic CLR programming or any similar object. A Timer method produces a signal at a fixed time interval. This signal originates from a thread in ThreadPool. We cannot choose the thread by ourselves. The same happens in the reactive version of Timer available through the Interval method.

Other than these built-in behaviors, we have the ability to set up a specific thread for our sequence by specifying the scheduler to be used. Here's an example:

var loopBasedSequence = Observable.Create<DateTime>(x => 
{ 
    while (true) 
    { 
        Console.WriteLine("{0} -> Yielding new value...", Thread.CurrentThread.ManagedThreadId); 
        x.OnNext(DateTime.Now); 
        Thread.Sleep(1000); 
    } 
    return Disposable.Empty; 
}); 
 
loopBasedSequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); 
loopBasedSequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); 
loopBasedSequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); 

The preceding example shows another implementation of the loop-based infinite sequence. This time, before the subscription, we specified to use a specific Scheduler class with the SubscribeOn extension method (discussed in detail later in this chapter). In the previous example, only the subscriber creates a subscription on the creation thread (the main thread) that prevented the application to exit from that infinite loop. Differently, in this new example, Scheduler assigned a thread to each observer subscription. This means that we have altogether three different threads running their infinite logic.

Tip

To use the Scheduler helper class, we need to import the System.Reactive.Concurrency namespace.

Default schedulers

We have the ability to select the desired scheduler between those available in the Scheduler helper class as static properties. Each prebuild scheduler is optimized for a specific kind of usage as background tasks, UI programming, and so on. Here's a short explanation of the available schedulers.

The Scheduler.Default (former Scheduler.ThreadPool) scheduler is the default concurrent scheduler that lets us interact with multithreaded execution in a simple way. It uses threads from ThreadPool. Bear in mind that ThreadPool has a finite number of threads and that its threads fulfil the needs of other CLR objects as tasks from TaskFactory. Obviously, we can set up the minimum and maximum number of threads with the ThreadPool.SetMinThreads and ThreadPool.SetMaxThreads static methods.

On the other hand, Scheduler.Immediate is definitely the default scheduler that runs when we don't ask for a scheduler. This is the blocking one that executes any subscription life cycle in the main thread.

The Scheduler.CurrentThread method is very similar to the Immediate one, but the Immediate one executes without queuing messages. This means that the CurrentThread one schedules messages to execute sequentially on the same creation thread, preventing these messages from creating a race/deadlock condition.

The Scheduler.Dispatcher method lets the observers execute on the UI thread for WPF or Silverlight applications.

Tip

To use the Dispatcher scheduler, import the NuGet package Rx-WPF or Rx-Silverlight.

The NewThreadScheduler.Default method (former Scheduler.NewThread) executes each observer on a new foreground thread (System.Threading.Thread) to avoid consuming a thread from ThreadPool as it happens with the Default scheduler. This means that we can virtually execute the maximum number of threads (~65K) the OS can handle, but this also means that this choice can dramatically reduce the application's (and OS) reliability because there is no throttling on thread amount, causing huge resource usages and the possibility to reach the starvation state that will impact the OS, too.

The Scheduler.TaskPool method executes the observer within Task from the default task factory. There are two main benefits in using this scheduler: threads in the thread pool (TaskFactory, by default, uses threads from ThreadPool) are pregenerated; this means that we don't have to wait for their creation (if there are enough threads ready in the pool). This is useful for reducing the delay in the message's response. Another benefit is that the pool contains a finite number of threads, acting as a throttling that doesn't allow an unpredictable number of threads to run together.

Tip

Each Rx operator comes with its default scheduler. This means that timer-based schedulers (for instance, Interval) run on the ThreadPool scheduler, while sequences that produce few messages will usually run on the Immediate scheduler. This design choice is made by preferring the less concurrent scheduler, while, when we select a specific scheduler or when we select the Default scheduler, we're asking Rx to configure its message pump to route messages to multiple threads to improve the messages' concurrency.

SubscribeOn/ObserveOn

In the previous example, we had the opportunity of seeing the SubscribeOn method in action. This method allows us to specify a scheduler to queue messages on different threads.

The same result is available with the ObserveOn method with the difference that the SubscribeOn method registers the scheduler on the whole the subscriber and, eventually, the observer code that produces messages related to the same observer. Differently, the ObserveOn method registers to the given scheduler only the output message by letting all the messages source from the same thread.

It's time to understand in detail how these methods work. The following example shows a sequence made with the Create operator. This operator lets us specify a custom code to produce messages per registration. Later, we will register two subscribers with SubscribeOn and then with the ObserveOn method specifying the Default (ThreadPool) scheduler. This will route execution on the thread pool. The two different operators will produce different results because the SubscribeOn method will put the whole Create operator on the pool's thread, while the ObserveOn will only put the resulting messages on the pool's thread. Here's the code:

var sequence = Observable.Create<DateTime>(x => 
{ 
    //let take some time before registering the new observer 
    for(int i=0;i<10;i++) 
    { 
        Console.WriteLine("Registering observer on thread {0}...", Thread.CurrentThread.ManagedThreadId); 
        Thread.Sleep(100); 
    } 
 
    //produce 10 messages 
    for(int i=0;i<10;i++) 
    { 
        x.OnNext(DateTime.Now); 
        Thread.Sleep(100); 
    } 
 
    //exit 
    return Disposable.Empty; 
}); 
 
//register two subscribers 
sequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); 
sequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); 
 
Console.ReadLine(); 

The example code is identical whether we use the SubscribeOn or the ObserveOn method. Let's evaluate the behavior of the two methods.

When using the ObserveOn method, the two observers will register sequentially. The second observer will register only when the first stops receiving messages. Then, each observer will receive its messages. Talking about the threading design, each observer will register on the same thread (the main one). Then, each observer will receive its messages on another thread of the pool. This means that the observer creational code that runs within the Create method will always run in the main thread per each observer. In other words, each subscription will execute on thread A (the main one); the first observer will handle messages on thread B, and the second observer will handle messages on thread C.

When using the SubscribeOn method, each observer will get correlated to a specific thread from the subscription to the completion. This means that the first observer will subscribe and receive messages on thread A (not the main one), while the second observer will subscribe and receive messages on thread B (not the main one).

The immediate result of these two implementations is that, with SubscribeOn, the two subscriptions will run in parallel by avoiding the second observer waiting for the first one to complete its job before registering. Obviously, this method produces the most concurrent result and is preferable in any case if we want to increase parallelism in our coding. However, this method cannot directly interact with UI controls as WPF or Windows Forms controls because all these controls require that the request (in the example, the code within the Create method) comes from the UI thread. In such cases, the ObserveOn method is definitely our choice.

Injecting schedulers

Other than injecting schedulers within the sequence chain by using the SubscribeOn or the ObserveOn method, we can pass a specific scheduler in almost any Rx operator's extension method, changing the default scheduler of the operator itself. Here's an example:

Console.WriteLine("Main thread: {0}", Thread.CurrentThread.ManagedThreadId); 
 
//numeric sequence 
var sequence = Observable.Range(1, 10, Scheduler.Default); 
 
//observers 
sequence.Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); 
sequence.Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); 

In the preceding example, we will see an implementation almost identical to the ObserveOn one because the two observers will run sequentially, receiving the same values from the sourcing sequence. This happens because the Range method produces a Cold sequence.

Tip

The Cold sequences are sequences that start their messaging workflow each time an observer subscribes. But, each Hot observer has its own messaging workflow regardless of whether one or multiple subscriptions exist.

Custom scheduling

A Scheduler class, as the name implies, other than something that deals with threads, is something that schedules some action at a given time. We can use Schedulers to schedule jobs of any kind at any absolute or relative or repetitive time. In the real-world reactive programming, we can use scheduled jobs to push messages into sequences or because of their ability to virtualize the time (later explained in the Virtual time section).

Here's an example of Immediate scheduling:

static void Main(string[] args) 
{ 
    using (var job1 = Scheduler.Default.Schedule(OnJob1Executed)) 
        //job timeout 
        Thread.Sleep(2000); 
 
    Console.WriteLine("END"); 
    Console.ReadLine(); 
} 
 
static void OnJob1Executed() 
{ 
    for (int i = 0; i < 10; i++) 
    { 
        Console.Write("."); 
        Thread.Sleep(100); 
    } 
 
    Console.WriteLine(); 
    Console.WriteLine("JOB END"); 
} 

The preceding example shows how to schedule a simple job immediately. Consider a real-world usage where we schedule immediate jobs by receiving inputs from a user or by handling an event. Although we could simply execute the same job immediately without Scheduler, by using Scheduler, we may, in future, replay the scheduled job sequence by saving this sequence somewhere. This gives us a great diagnostic tool. More details on it are available in the Virtual time section.

Another interesting thing to focus on by looking at this example is the job timeout that we can invoke by simply disposing the job token (in the code, this is the job1 variable).

The token represents the scheduled job, not its implementation. This means that it is cancelling job, preventing it from being executed, but not from breaking an eventual execution if this has already been triggered by Scheduler.

Future scheduling

Future scheduling is the ability to schedule at a given (future) absolute time, or relative time, or periodic time. The usage is pretty identical to the generic scheduling with the addition of a scheduling time value.

An important aspect is that, here, the timeout we give to the job token may actually prevent the scheduled job from firing.

Here are a few examples:

//starts a job in absolute time 
using (var job2 = Scheduler.Default.Schedule(DateTimeOffset.Now.AddSeconds(1), () 
.0=> Console.WriteLine("OK"))) 
    //job timeout 
    Thread.Sleep(2000); 
 
//starts a job in relative time 
using (var job3 = Scheduler.Default.Schedule(TimeSpan.FromSeconds(10), () => Console.WriteLine("OK"))) 
    //job timeout 
    //this job will never fire because its schedule is greater than how time timeout will grant 
    Thread.Sleep(2000); 
 
//starts a job periodically 
using (var job4 = Scheduler.Default.SchedulePeriodic(TimeSpan.FromSeconds(1), () => Console.WriteLine("OK"))) 
    //timeout at 5 seconds 
    Thread.Sleep(5000); 
 
Console.WriteLine("END"); 
Console.ReadLine();  

The periodic example (the third) will fire our scheduled job until we don't kill it by disposing the token (job2).

The ability to create custom future scheduling with multiple different Schedulers gives us tremendous chances of designing our solution the best we can.

Virtual time

When we create complex scheduling, we can work in virtual time to achieve testability or to have the ability to replay some real-world (in production) execution because of testing needs, diagnostic needs, or production needs.

Tip

To execute the following examples, import the Microsoft.Reactive.Testing and System.Reactive.Concurrency namespaces.

The concept is very easy. We can create a scheduler specific for testing purposes, TestScheduler (import the NuGet Rx-Testing package). Then, we can use it as usual by scheduling jobs. We can advance the virtual clock by letting the scheduler execute as in the real world. Here's a short example to understand it easily:

//a scheduler for testing purposes 
var scheduler = new TestScheduler(); 
 
//records to schedule an immediate action 
scheduler.Schedule(() => Console.WriteLine("Hi")); 
 
//advance the virtual clock to let execute the recorded actions 
scheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);  

The preceding example shows how to use TestScheduler. Scheduler will record each scheduled job we ask it to execute. It never executes jobs as usual schedulers; it only records jobs. Bear in mind that we always need to manually play the recorded job schedule to actually execute jobs.

Alternatively, to manually move the virtual clock forward with the AdvanceBy method, we can simply play the record by invoking the Start method. Here's an example:

////advance the virtual clock to let execute the recorded actions 
//scheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks); 
 
//play the recorded scheduled jobs at normal speed 
scheduler.Start(); 

The Start method will let the scheduler immediately output all the outputs regardless of the job's absolute timings while respecting the sequence order. Here's a more complete example:

//output the virtual clock 
Console.WriteLine("-> {0}", scheduler.Now); 
 
//schedule a future job 
scheduler.Schedule(TimeSpan.FromDays(22), () => Console.WriteLine("2 seconds now")); 
Console.WriteLine("-> {0}", scheduler.Now); 
 
//play the recorded scheduled jobs at normal speed 
scheduler.Start(); 
Console.WriteLine("-> {0}", scheduler.Now); 

In the preceding example, we created a future job that will start with a 22 days delay. However, by invoking the Start method, the scheduler will immediately advance the virtual clock (available with the Now property) to the next job time, causing the related action's execution. This will leave the internal virtual clock to the last job time (in this example, 22 days).

We can register other jobs even after we invoke the Start method. These jobs will not fire until we invoke the Start method again.

Tip

If multiple jobs are scheduled at the same identical virtual time, Scheduler will execute these jobs according to the registration time.

We can advance to a specific virtual time by using the AdvanceTo method. Here's an example:

//schedule a future job at 1 minute 
scheduler.Schedule(TimeSpan.FromMinutes(1), () => Console.WriteLine("2 seconds now")); 
Console.WriteLine("-> {0}", scheduler.Now); 
 
//advance to 00:00:30 
scheduler.AdvanceTo(TimeSpan.FromSeconds(30).Ticks);  
Console.WriteLine("-> {0}", scheduler.Now); 
 
//advance to 00:01:00 
scheduler.AdvanceTo(TimeSpan.FromSeconds(60).Ticks); 
Console.WriteLine("-> {0}", scheduler.Now); 

By invoking the AdvanceTo method of the Scheduler class, we can advance the virtual time as we wish. Obviously, we cannot advance back in time! Scheduler is a forward-only virtual time clock.

Another interesting feature is the ability to schedule a Stop execution on the scheduler itself. This will pause the scheduler (another Start will let the scheduler start again, playing newly recorded jobs eventually) at a specified time. Here's a complete example:

//schedule a periodic job and output the virtual time 
scheduler.SchedulePeriodic(TimeSpan.FromSeconds(1), () => Console.WriteLine("{0} -> Periodic", scheduler.Now)); 
 
//this would produce an infinite output 
//scheduler.Start(); 
 
//to avoid the infinite output, we will need to schedule a Stop request 
scheduler.Schedule(TimeSpan.FromSeconds(60), () => scheduler.Stop()); 
 
//play the whole record 
scheduler.Start(); 
 
//append immediately 
scheduler.Schedule(TimeSpan.FromTicks(1), () => Console.WriteLine("Running again")); 
 
//schedule another Stop 
scheduler.Schedule(TimeSpan.FromSeconds(60), () => scheduler.Stop()); 
             
//start again the scheduler 
scheduler.Start(); 
 
Console.ReadLine(); 

In the preceding example, we can see the Stop usage. Bear in mind that after the first Start method executes, the whole record will play until Stop fires. This will pause the scheduler setting in its IsEnabled property to False. Then, we will enqueue the other two jobs, an immediate output and another Stop. This second Stop method will prevent Scheduler from running indefinitely after the second Start request. This would happen because the virtual clock can only advance, meaning that after the first Stop, it will never go back to running the same (first) Stop method again. In other words, invoking multiple times the Start method will not restart the Scheduler object from the beginning. It will simply start again from its last virtual clock time.

Testing schedulers

Now that we know how to start/stop virtual Schedulers, we need to look at how to test TestScheduler in an automated way, as we may do in nonreactive coding with unit testing.

To test scheduler and sequences, we need to create the mock version of a sequence and of an observer. They are available as a helper method using the TestScheduler class. We can create a mock observer with the CreateObserver<T> method and mock sequences with the CreateColdObservable or CreateHotObservable methods. Bear in mind that a Cold sequence produces the same message flow at each subscription while a Hot sequence fires its message regardless of the live subscriptions.

Here's a short example regarding a cold sequence:

var scheduler = new TestScheduler(); 
 
//a cold sequence 
var sequence = scheduler.CreateColdObservable<int>( 
    //some recorded message 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(1).Ticks,
    Notification.CreateOnNext(10)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(2).Ticks, 
    Notification.CreateOnNext(20)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(3).Ticks,
    Notification.CreateOnNext(30)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(4).Ticks,
    Notification.CreateOnNext(40)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(5).Ticks, 
    Notification.CreateOnNext(50)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(6).Ticks, 
    Notification.CreateOnNext(60)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(7).Ticks, 
    Notification.CreateOnCompleted<int>()) 
); 
 
//a new testable observer 
var observer1 = scheduler.CreateObserver<int>(); 
 
//subscribe the observer at a given virtual time 
scheduler.Schedule(TimeSpan.FromSeconds(2), () => sequence.Subscribe(observer1)); 
 
//play the record 
scheduler.Start(); 

As visible in the preceding example, we can simply create a sequence with the TestScheduler.CreateColdObservable method by specifying the message record list to be produced when playing. To produce recorded messages, we will wrap our values (integer values in this example or none for the OnCompleted message) into a Notification object. Then, we will wrap the notification into a Recorded object by providing the relative time defer to use to flow the specified message.

We will subscribe the observer to the sequence within another job in virtual time (after 2 seconds in the example). This will let the observer receive 7 messages regardless of the delay of 2 seconds of the subscription. This happens because we're using a Cold observable sequence.

Tip

We can substitute CreateColdObservable with CreateHotObservable. With a Hot observable sequence, we will receive only 5 messages into the observer because the hot observer will produce messages regardless of whether the subscriber exists.

To verify message flowing at the observer's point of view, we can evaluate its Messages property after the TestScheduler.Start is complete. Here's an example:

foreach (var m in observer1.Messages) 
{ 
    var time = m.Time; 
    //available only for OnNext messages 
    //var value = m.Value.Value; 
    //var exception = m.Value.Exception; 
    //var kind = m.Value.Kind; 
    //var hasValue = m.Value.HasValue; 
 
    Console.WriteLine("{0}", m); 
} 

As visible, there are a lot of interesting properties available in the Notification object that details the flowing message. This object is the same as we had while using the Materialize method to produce a tracing sequence, as we have already seen in Chapter 5, Debugging Reactive Extension .

Now, it is time to make some assertions to check whether messages within the observer behave as expected. The next example will use AssertEquals of the Messages property of the mock observer. Here's the short code:

observer1.Messages.AssertEqual( 
    //same messages 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(1).Ticks + 
    TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(10)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(2).Ticks + 
    TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(20)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(3).Ticks + 
    TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(30)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(4).Ticks + 
    TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(40)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(5).Ticks + 
    TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(50)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(6).Ticks + 
    TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(60)), 
    new Recorded<Notification<int>>(TimeSpan.FromSeconds(7).Ticks + 
    TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnCompleted<int>()) 
); 

Kindly focus on the need for adding 2 seconds (observer subscription virtual time) to the virtual time value of the messages we're testing.

An in-depth explanation of the Rx scheduler testing (by Rx team) is available at:

https://blogs.msdn.microsoft.com/rxteam/2012/06/14/testing-rx-queries-using-virtual-time-scheduling/ .

Historical records

Another class similar to TestScheduler is available to produce a real-life replay of messages flowing from usual sequences, HistoricalScheduler. This class is almost identical to TestScheduler with the difference that we can use it to replay dumped messages. To dump messages from a sequence, we need to store the physical time together with the value message by using the Timestamp extension method. Once we have this time-stamped sequence, we need to move back from a continuous (reactive) programming to a state-driven programming by dumping messages into a finite collection by using the ToList extension method. Then, we need to use the Wait extension method to pause the sequence completion and produce the required collection.

Now, we're ready to use this collection in the Observable.Generate method that will recreate a sequence from a timestamped finite message collection. This method will need HistoricalScheduler as a parameter to handle virtual time advancement, as we have already seen with the usual virtual time of TestScheduler. Here's a complete example:

Console.WriteLine("{0} -> Playing...", DateTime.Now); 
 
//a sourcing sequence 
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5); 
 
var trace = sequence 
    //marks each message with a timestamp 
    .Timestamp() 
    //route messages into a list of timestamped messages 
    .ToList() 
    //materialize the list when the sequence completes 
    //and return only the list 
    .Wait(); 
 
//a scheduler for historical records 
var scheduler = new HistoricalScheduler(); 
 
Console.WriteLine("{0} -> Replaying...", DateTime.Now); 
 
//generate a new sequence from a collection 
var replay = Observable.Generate( 
    //the enumerator to read values from 
    trace.GetEnumerator(), 
    //the condition to check until False 
    x => x.MoveNext(), 
    //the item 
    x => x, 
    //the item's value 
    x => x.Current.Value, 
    //the item's virtual time 
    x => x.Current.Timestamp, 
    //the scheduler  
    scheduler); 
 
//some output 
replay.Subscribe(x => Console.WriteLine("{0} -> {1}", scheduler.Now, x)); 
 
//play the record 
scheduler.Start(); 
 
Console.ReadLine(); 

By running the preceding example, we will see the replay producing with the same real-time progression. Similar to TestScheduler, HistoricalScheduler will flatten virtual time by outputting all the messages immediately after the invocation of the Start method.

Tip

Downloading the example code You can download the example code files for this book from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you. You can download the code files by following these steps:

  1. Log in or register to our website using your e-mail address and password.
  2. Hover the mouse pointer on the SUPPORT tab at the top.
  3. Click on Code Downloads & Errata.
  4. Enter the name of the book in the Search box.
  5. Select the book for which you're looking to download the code files.
  6. Choose from the drop-down menu where you purchased this book from.
  7. Click on Code Download.

You can also download the code files by clicking on the Code Files button on the book's webpage at the Packt Publishing website. This page can be accessed by entering the book's name in the Search box. Please note that you need to be logged in to your Packt account. Once the file is downloaded, please make sure that you unzip or extract the folder using the latest version of:

  • WinRAR / 7-Zip for Windows
  • Zipeg / iZip / UnRarX for Mac
  • 7-Zip / PeaZip for Linux

The code bundle for the book is also hosted on GitHub at https://github.com/PacktPublishing/Reactive-Programming-for-.NET-Developers. We also have other code bundles from our rich catalog of books and videos available at https://github.com/PacktPublishing/. Check them out!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.163.250