Programming observable sequences is a powerful way to achieve a highly modularized programming experience. Although this offers high flexibility, an observable sequence is single-threaded by default and is similar to a lot of other CLR objects. This means that although a sequence can push messages to multiple subscribers, this operation happens in the same thread where the messages originate and then the messages reach all the subscribers, sequentially following their subscription order.
This means that using an observable sequence instead of any other .NET object does not convert automatically our code into a multithreaded one.
Luckily, to address this automatic multithreading need in the Rx world, there are Schedulers
. These are objects that choose when a message can flow and which thread must handle a message.
It is important to understand the huge difference that exists between using a CLR delegate
/event
and Rx scheduling. When using scheduling in Rx, we would always have an asynchronous implementation in an in-memory queue of messages that act as a message pump supporting Quality of Service (QoS) for routing messages at different speeds. These messages flow and are observed in a thread or another based on the scheduler's choice. This is actually the most critical task of a scheduler.
In short, a scheduler is in charge of choosing the message
/thread
association. Let's take a look at this familiar code:
var loopBasedSequence = Observable.Create<DateTime>(x => { while (true) { Console.WriteLine("{0} -> Yielding new value...", Thread.CurrentThread.ManagedThreadId); x.OnNext(DateTime.Now); Thread.Sleep(1000); } return Disposable.Empty; }); loopBasedSequence.Subscribe(x => Console.WriteLine("-> {0}", x));
The preceding example shows us a very simple implementation of an infinite sequence. Based on a simple infinite loop, this example produces messages that originate always from the same thread. This is obvious because we are looping within a While
loop; the same behavior is available in nonreactive coding.
Let's take a look at another example that shows a timer-based sequence:
var timerBasedSequence = Observable.Interval(TimeSpan.FromSeconds(1)) .Select(x => { Console.WriteLine("{0} -> Yielding new value...", Thread.CurrentThread.ManagedThreadId); return DateTime.Now; }); timerBasedSequence.Subscribe(x => Console.WriteLine("-> {0}", x));
This example shows another infinite sequence that is slightly different from the previous one. This sequence is timer-based, such as when we use System.Threading.Timer
in classic CLR programming or any similar object. A Timer
method produces a signal at a fixed time interval. This signal originates from a thread in ThreadPool
. We cannot choose the thread by ourselves. The same happens in the reactive version of Timer
available through the Interval
method.
Other than these built-in behaviors, we have the ability to set up a specific thread for our sequence by specifying the scheduler to be used. Here's an example:
var loopBasedSequence = Observable.Create<DateTime>(x => { while (true) { Console.WriteLine("{0} -> Yielding new value...", Thread.CurrentThread.ManagedThreadId); x.OnNext(DateTime.Now); Thread.Sleep(1000); } return Disposable.Empty; }); loopBasedSequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); loopBasedSequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); loopBasedSequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x));
The preceding example shows another implementation of the loop-based infinite sequence. This time, before the subscription, we specified to use a specific Scheduler
class with the SubscribeOn
extension method (discussed in detail later in this chapter). In the previous example, only the subscriber creates a subscription on the creation thread (the main thread) that prevented the application to exit from that infinite loop. Differently, in this new example, Scheduler
assigned a thread to each observer subscription. This means that we have altogether three different threads running their infinite logic.
We have the ability to select the desired scheduler between those available in the Scheduler
helper class as static properties. Each prebuild scheduler is optimized for a specific kind of usage as background tasks, UI programming, and so on. Here's a short explanation of the available schedulers.
The Scheduler.Default
(former Scheduler.ThreadPool
) scheduler is the default concurrent scheduler that lets us interact with multithreaded execution in a simple way. It uses threads from ThreadPool
. Bear in mind that ThreadPool
has a finite number of threads and that its threads fulfil the needs of other CLR objects as tasks from TaskFactory
. Obviously, we can set up the minimum and maximum number of threads with the ThreadPool.SetMinThreads
and ThreadPool.SetMaxThreads
static methods.
On the other hand, Scheduler.Immediate
is definitely the default scheduler that runs when we don't ask for a scheduler. This is the blocking one that executes any subscription life cycle in the main thread.
The Scheduler.CurrentThread
method is very similar to the Immediate
one, but the Immediate
one executes without queuing messages. This means that the CurrentThread
one schedules messages to execute sequentially on the same creation thread, preventing these messages from creating a race/deadlock condition.
The Scheduler.Dispatcher
method lets the observers execute on the UI thread for WPF or Silverlight applications.
The NewThreadScheduler.Default
method (former Scheduler.NewThread
) executes each observer on a new foreground thread (System.Threading.Thread
) to avoid consuming a thread from ThreadPool
as it happens with the Default
scheduler. This means that we can virtually execute the maximum number of threads (~65K) the OS can handle, but this also means that this choice can dramatically reduce the application's (and OS) reliability because there is no throttling on thread amount, causing huge resource usages and the possibility to reach the starvation state that will impact the OS, too.
The Scheduler.TaskPool
method executes the observer within Task
from the default task factory. There are two main benefits in using this scheduler: threads in the thread pool (TaskFactory
, by default, uses threads from ThreadPool
) are pregenerated; this means that we don't have to wait for their creation (if there are enough threads ready in the pool). This is useful for reducing the delay in the message's response. Another benefit is that the pool contains a finite number of threads, acting as a throttling that doesn't allow an unpredictable number of threads to run together.
Each Rx operator comes with its default scheduler. This means that timer-based schedulers (for instance, Interval) run on the ThreadPool
scheduler, while sequences that produce few messages will usually run on the Immediate
scheduler. This design choice is made by preferring the less concurrent scheduler, while, when we select a specific scheduler or when we select the Default
scheduler, we're asking Rx to configure its message pump to route messages to multiple threads to improve the messages' concurrency.
In the previous example, we had the opportunity of seeing the SubscribeOn
method in action. This method allows us to specify a scheduler to queue messages on different threads.
The same result is available with the ObserveOn
method with the difference that the SubscribeOn
method registers the scheduler on the whole the subscriber and, eventually, the observer code that produces messages related to the same observer. Differently, the ObserveOn
method registers to the given scheduler only the output message by letting all the messages source from the same thread.
It's time to understand in detail how these methods work. The following example shows a sequence made with the Create
operator. This operator lets us specify a custom code to produce messages per registration. Later, we will register two subscribers with SubscribeOn
and then with the ObserveOn
method specifying the Default
(ThreadPool
) scheduler. This will route execution on the thread pool. The two different operators will produce different results because the SubscribeOn
method will put the whole Create
operator on the pool's thread, while the ObserveOn
will only put the resulting messages on the pool's thread. Here's the code:
var sequence = Observable.Create<DateTime>(x => { //let take some time before registering the new observer for(int i=0;i<10;i++) { Console.WriteLine("Registering observer on thread {0}...", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(100); } //produce 10 messages for(int i=0;i<10;i++) { x.OnNext(DateTime.Now); Thread.Sleep(100); } //exit return Disposable.Empty; }); //register two subscribers sequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); sequence.SubscribeOn(Scheduler.Default).Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); Console.ReadLine();
The example code is identical whether we use the SubscribeOn
or the ObserveOn
method. Let's evaluate the behavior of the two methods.
When using the ObserveOn
method, the two observers will register sequentially. The second observer will register only when the first stops receiving messages. Then, each observer will receive its messages. Talking about the threading design, each observer will register on the same thread (the main one). Then, each observer will receive its messages on another thread of the pool. This means that the observer creational code that runs within the Create
method will always run in the main thread per each observer. In other words, each subscription will execute on thread A (the main one); the first observer will handle messages on thread B, and the second observer will handle messages on thread C.
When using the SubscribeOn
method, each observer will get correlated to a specific thread from the subscription to the completion. This means that the first observer will subscribe and receive messages on thread A (not the main one), while the second observer will subscribe and receive messages on thread B (not the main one).
The immediate result of these two implementations is that, with SubscribeOn
, the two subscriptions will run in parallel by avoiding the second observer waiting for the first one to complete its job before registering. Obviously, this method produces the most concurrent result and is preferable in any case if we want to increase parallelism in our coding. However, this method cannot directly interact with UI controls as WPF or Windows Forms controls because all these controls require that the request (in the example, the code within the Create
method) comes from the UI thread. In such cases, the ObserveOn
method is definitely our choice.
Other than injecting schedulers within the sequence chain by using the SubscribeOn
or the ObserveOn
method, we can pass a specific scheduler in almost any Rx operator's extension method, changing the default scheduler of the operator itself. Here's an example:
Console.WriteLine("Main thread: {0}", Thread.CurrentThread.ManagedThreadId); //numeric sequence var sequence = Observable.Range(1, 10, Scheduler.Default); //observers sequence.Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x)); sequence.Subscribe(x => Console.WriteLine("{0} -> {1}", Thread.CurrentThread.ManagedThreadId, x));
In the preceding example, we will see an implementation almost identical to the ObserveOn
one because the two observers will run sequentially, receiving the same values from the sourcing sequence. This happens because the Range
method produces a Cold
sequence.
A Scheduler
class, as the name implies, other than something that deals with threads, is something that schedules some action at a given time. We can use Schedulers
to schedule jobs of any kind at any absolute or relative or repetitive time. In the real-world reactive programming, we can use scheduled jobs to push messages into sequences or because of their ability to virtualize the time (later explained in the Virtual time section).
Here's an example of Immediate
scheduling:
static void Main(string[] args) { using (var job1 = Scheduler.Default.Schedule(OnJob1Executed)) //job timeout Thread.Sleep(2000); Console.WriteLine("END"); Console.ReadLine(); } static void OnJob1Executed() { for (int i = 0; i < 10; i++) { Console.Write("."); Thread.Sleep(100); } Console.WriteLine(); Console.WriteLine("JOB END"); }
The preceding example shows how to schedule a simple job immediately. Consider a real-world usage where we schedule immediate jobs by receiving inputs from a user or by handling an event. Although we could simply execute the same job immediately without Scheduler
, by using Scheduler
, we may, in future, replay the scheduled job sequence by saving this sequence somewhere. This gives us a great diagnostic tool. More details on it are available in the Virtual time section.
Another interesting thing to focus on by looking at this example is the job timeout that we can invoke by simply disposing the job token (in the code, this is the job1
variable).
The token represents the scheduled job, not its implementation. This means that it is cancelling job
, preventing it from being executed, but not from breaking an eventual execution if this has already been triggered by Scheduler
.
Future scheduling is the ability to schedule at a given (future) absolute time, or relative time, or periodic time. The usage is pretty identical to the generic scheduling with the addition of a scheduling time value.
An important aspect is that, here, the timeout we give to the job token may actually prevent the scheduled job from firing.
Here are a few examples:
//starts a job in absolute time using (var job2 = Scheduler.Default.Schedule(DateTimeOffset.Now.AddSeconds(1), () .0=> Console.WriteLine("OK"))) //job timeout Thread.Sleep(2000); //starts a job in relative time using (var job3 = Scheduler.Default.Schedule(TimeSpan.FromSeconds(10), () => Console.WriteLine("OK"))) //job timeout //this job will never fire because its schedule is greater than how time timeout will grant Thread.Sleep(2000); //starts a job periodically using (var job4 = Scheduler.Default.SchedulePeriodic(TimeSpan.FromSeconds(1), () => Console.WriteLine("OK"))) //timeout at 5 seconds Thread.Sleep(5000); Console.WriteLine("END"); Console.ReadLine();
The periodic example (the third) will fire our scheduled job until we don't kill it by disposing the token (job2
).
The ability to create custom future scheduling with multiple different Schedulers
gives us tremendous chances of designing our solution the best we can.
When we create complex scheduling, we can work in virtual time to achieve testability or to have the ability to replay some real-world (in production) execution because of testing needs, diagnostic needs, or production needs.
The concept is very easy. We can create a scheduler specific for testing purposes, TestScheduler
(import the NuGet Rx-Testing
package). Then, we can use it as usual by scheduling jobs. We can advance the virtual clock by letting the scheduler execute as in the real world. Here's a short example to understand it easily:
//a scheduler for testing purposes var scheduler = new TestScheduler(); //records to schedule an immediate action scheduler.Schedule(() => Console.WriteLine("Hi")); //advance the virtual clock to let execute the recorded actions scheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);
The preceding example shows how to use TestScheduler
. Scheduler
will record each scheduled job we ask it to execute. It never executes jobs as usual schedulers; it only records jobs. Bear in mind that we always need to manually play the recorded job schedule to actually execute jobs.
Alternatively, to manually move the virtual clock forward with the AdvanceBy
method, we can simply play the record by invoking the Start
method. Here's an example:
////advance the virtual clock to let execute the recorded actions //scheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks); //play the recorded scheduled jobs at normal speed scheduler.Start();
The Start
method will let the scheduler immediately output all the outputs regardless of the job's absolute timings while respecting the sequence order. Here's a more complete example:
//output the virtual clock Console.WriteLine("-> {0}", scheduler.Now); //schedule a future job scheduler.Schedule(TimeSpan.FromDays(22), () => Console.WriteLine("2 seconds now")); Console.WriteLine("-> {0}", scheduler.Now); //play the recorded scheduled jobs at normal speed scheduler.Start(); Console.WriteLine("-> {0}", scheduler.Now);
In the preceding example, we created a future job that will start with a 22 days
delay. However, by invoking the Start
method, the scheduler will immediately advance the virtual clock (available with the Now
property) to the next job time, causing the related action's execution. This will leave the internal virtual clock to the last job time (in this example, 22 days
).
We can register other jobs even after we invoke the Start
method. These jobs will not fire until we invoke the Start
method again.
We can advance to a specific virtual time by using the AdvanceTo
method. Here's an example:
//schedule a future job at 1 minute scheduler.Schedule(TimeSpan.FromMinutes(1), () => Console.WriteLine("2 seconds now")); Console.WriteLine("-> {0}", scheduler.Now); //advance to 00:00:30 scheduler.AdvanceTo(TimeSpan.FromSeconds(30).Ticks); Console.WriteLine("-> {0}", scheduler.Now); //advance to 00:01:00 scheduler.AdvanceTo(TimeSpan.FromSeconds(60).Ticks); Console.WriteLine("-> {0}", scheduler.Now);
By invoking the AdvanceTo
method of the Scheduler
class, we can advance the virtual time as we wish. Obviously, we cannot advance back in time! Scheduler
is a forward-only virtual time clock.
Another interesting feature is the ability to schedule a Stop
execution on the scheduler itself. This will pause the scheduler (another Start
will let the scheduler start again, playing newly recorded jobs eventually) at a specified time. Here's a complete example:
//schedule a periodic job and output the virtual time scheduler.SchedulePeriodic(TimeSpan.FromSeconds(1), () => Console.WriteLine("{0} -> Periodic", scheduler.Now)); //this would produce an infinite output //scheduler.Start(); //to avoid the infinite output, we will need to schedule a Stop request scheduler.Schedule(TimeSpan.FromSeconds(60), () => scheduler.Stop()); //play the whole record scheduler.Start(); //append immediately scheduler.Schedule(TimeSpan.FromTicks(1), () => Console.WriteLine("Running again")); //schedule another Stop scheduler.Schedule(TimeSpan.FromSeconds(60), () => scheduler.Stop()); //start again the scheduler scheduler.Start(); Console.ReadLine();
In the preceding example, we can see the Stop
usage. Bear in mind that after the first Start
method executes, the whole record will play until Stop
fires. This will pause the scheduler setting in its IsEnabled
property to False
. Then, we will enqueue the other two jobs, an immediate output and another Stop
. This second Stop
method will prevent Scheduler
from running indefinitely after the second Start
request. This would happen because the virtual clock can only advance, meaning that after the first Stop
, it will never go back to running the same (first) Stop
method again. In other words, invoking multiple times the Start
method will not restart the Scheduler
object from the beginning. It will simply start again from its last virtual clock time.
Now that we know how to start/stop virtual Schedulers
, we need to look at how to test TestScheduler
in an automated way, as we may do in nonreactive coding with unit testing.
To test scheduler and sequences, we need to create the mock version of a sequence and of an observer. They are available as a helper method using the TestScheduler
class. We can create a mock observer with the CreateObserver<T>
method and mock sequences with the CreateColdObservable
or CreateHotObservable
methods. Bear in mind that a Cold
sequence produces the same message flow at each subscription while a Hot
sequence fires its message regardless of the live subscriptions.
Here's a short example regarding a cold sequence:
var scheduler = new TestScheduler(); //a cold sequence var sequence = scheduler.CreateColdObservable<int>( //some recorded message new Recorded<Notification<int>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext(10)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(20)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext(30)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(4).Ticks, Notification.CreateOnNext(40)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(5).Ticks, Notification.CreateOnNext(50)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(6).Ticks, Notification.CreateOnNext(60)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(7).Ticks, Notification.CreateOnCompleted<int>()) ); //a new testable observer var observer1 = scheduler.CreateObserver<int>(); //subscribe the observer at a given virtual time scheduler.Schedule(TimeSpan.FromSeconds(2), () => sequence.Subscribe(observer1)); //play the record scheduler.Start();
As visible in the preceding example, we can simply create a sequence with the TestScheduler.CreateColdObservable
method by specifying the message record list to be produced when playing. To produce recorded messages, we will wrap our values (integer values in this example or none for the OnCompleted
message) into a Notification
object. Then, we will wrap the notification into a Recorded
object by providing the relative time defer to use to flow the specified message.
We will subscribe the observer to the sequence within another job in virtual time (after 2
seconds in the example). This will let the observer receive 7
messages regardless of the delay of 2
seconds of the subscription. This happens because we're using a Cold
observable sequence.
To verify message flowing at the observer's point of view, we can evaluate its Messages
property after the TestScheduler.Start
is complete. Here's an example:
foreach (var m in observer1.Messages) { var time = m.Time; //available only for OnNext messages //var value = m.Value.Value; //var exception = m.Value.Exception; //var kind = m.Value.Kind; //var hasValue = m.Value.HasValue; Console.WriteLine("{0}", m); }
As visible, there are a lot of interesting properties available in the Notification
object that details the flowing message. This object is the same as we had while using the Materialize
method to produce a tracing sequence, as we have already seen in Chapter 5,
Debugging Reactive Extension
.
Now, it is time to make some assertions to check whether messages within the observer behave as expected. The next example will use AssertEquals
of the Messages
property of the mock observer. Here's the short code:
observer1.Messages.AssertEqual( //same messages new Recorded<Notification<int>>(TimeSpan.FromSeconds(1).Ticks + TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(10)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(2).Ticks + TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(20)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(3).Ticks + TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(30)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(4).Ticks + TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(40)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(5).Ticks + TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(50)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(6).Ticks + TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext(60)), new Recorded<Notification<int>>(TimeSpan.FromSeconds(7).Ticks + TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnCompleted<int>()) );
Kindly focus on the need for adding 2
seconds (observer subscription virtual time) to the virtual time value of the messages we're testing.
An in-depth explanation of the Rx scheduler testing (by Rx team) is available at:
https://blogs.msdn.microsoft.com/rxteam/2012/06/14/testing-rx-queries-using-virtual-time-scheduling/ .
Another class similar to TestScheduler
is available to produce a real-life replay of messages flowing from usual sequences, HistoricalScheduler
. This class is almost identical to TestScheduler
with the difference that we can use it to replay dumped messages. To dump messages from a sequence, we need to store the physical time together with the value message by using the Timestamp
extension method. Once we have this time-stamped sequence, we need to move back from a continuous (reactive) programming to a state-driven programming by dumping messages into a finite collection by using the ToList
extension method. Then, we need to use the Wait
extension method to pause the sequence completion and produce the required collection.
Now, we're ready to use this collection in the Observable.Generate
method that will recreate a sequence from a timestamped finite message collection. This method will need HistoricalScheduler
as a parameter to handle virtual time advancement, as we have already seen with the usual virtual time of TestScheduler
. Here's a complete example:
Console.WriteLine("{0} -> Playing...", DateTime.Now); //a sourcing sequence var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5); var trace = sequence //marks each message with a timestamp .Timestamp() //route messages into a list of timestamped messages .ToList() //materialize the list when the sequence completes //and return only the list .Wait(); //a scheduler for historical records var scheduler = new HistoricalScheduler(); Console.WriteLine("{0} -> Replaying...", DateTime.Now); //generate a new sequence from a collection var replay = Observable.Generate( //the enumerator to read values from trace.GetEnumerator(), //the condition to check until False x => x.MoveNext(), //the item x => x, //the item's value x => x.Current.Value, //the item's virtual time x => x.Current.Timestamp, //the scheduler scheduler); //some output replay.Subscribe(x => Console.WriteLine("{0} -> {1}", scheduler.Now, x)); //play the record scheduler.Start(); Console.ReadLine();
By running the preceding example, we will see the replay producing with the same real-time progression. Similar to TestScheduler
, HistoricalScheduler
will flatten virtual time by outputting all the messages immediately after the invocation of the Start
method.
Downloading the example code You can download the example code files for this book from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you. You can download the code files by following these steps:
You can also download the code files by clicking on the Code Files button on the book's webpage at the Packt Publishing website. This page can be accessed by entering the book's name in the Search box. Please note that you need to be logged in to your Packt account. Once the file is downloaded, please make sure that you unzip or extract the folder using the latest version of:
The code bundle for the book is also hosted on GitHub at https://github.com/PacktPublishing/Reactive-Programming-for-.NET-Developers. We also have other code bundles from our rich catalog of books and videos available at https://github.com/PacktPublishing/. Check them out!
18.118.163.250