With Rx, we have the ability to let our messages flow in specific threads to achieve high concurrency computation or we can define to use the main thread to comply with the UI controls requirements. This kind of thread integration is discussed in the Scheduling section.
Differently, in this section, we will cover the ability to flow the result of an asynchronous operation within a sequence.
A Task
process is an asynchronous operation wrapped into an object that gives us the ability to create task hierarchy, task cancellation, and so on.
Often, in our applications, we use tasks to handle CPU-bound or IO-bound operations. When we're dealing with reactive applications, the best way to acknowledge a task completion is by routing ack
as a message within a sequence. This is available throughout the ToObservable
extension method of the Task
class. Here's an example:
//as simple task var task = Task.Factory.StartNew(() => { Thread.Sleep(1000); return DateTime.Now; }); //a sequence to ack the task's result //need using System.Reactive.Threading.Tasks var ackSequence = task.ToObservable(); //some output ackSequence.Subscribe(x => Console.WriteLine(x)); Console.ReadLine();
We can use task cancellation within the sequence creation to have the opportunity to cancel internal subscription executions with the usual task cancellation design. Here's a complete example:
//a cancellable sequence var fromDatabase = Observable.Create<DateTime>(o => { //a cancellation token source for timeout var tks = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var token = tks.Token; //the cancellable task within the sequence return Task.Factory.StartNew(() => { //run until cancel requested while (!token.IsCancellationRequested) using (var cn = new SqlConnection(@"data source=(local);integrated security=true;")) using (var cm = new SqlCommand("select getdate()", cn)) { Thread.Sleep(1000); cn.Open(); //read time from DB o.OnNext((DateTime)cm.ExecuteScalar()); } //signal oncompleted o.OnCompleted(); //returns a disposable subscription completed object //with an OnCompleted callback return Disposable.Create(() => Console.WriteLine("Killing subscription")); }, token); }); fromDatabase.Subscribe(x => Console.WriteLine(x));
The preceding example shows how to cancel the internal per subscription message generation function. Essentially, it's a classic task cancellation example. The only difference is that this executes within a Create
operator that specifies the message generation workflow that runs per subscriber. In this example, we specified a 5
seconds timeout that signals CancellationToken
to end the internal execution of Task
, exits its execution loop, and correctly completes its job.
18.117.138.104