Threading integration

With Rx, we have the ability to let our messages flow in specific threads to achieve high concurrency computation or we can define to use the main thread to comply with the UI controls requirements. This kind of thread integration is discussed in the Scheduling section.

Differently, in this section, we will cover the ability to flow the result of an asynchronous operation within a sequence.

Sourcing from a Task

A Task process is an asynchronous operation wrapped into an object that gives us the ability to create task hierarchy, task cancellation, and so on.

Often, in our applications, we use tasks to handle CPU-bound or IO-bound operations. When we're dealing with reactive applications, the best way to acknowledge a task completion is by routing ack as a message within a sequence. This is available throughout the ToObservable extension method of the Task class. Here's an example:

//as simple task 
var task = Task.Factory.StartNew(() => 
{ 
    Thread.Sleep(1000); 
    return DateTime.Now; 
}); 
 
//a sequence to ack the task's result 
//need using System.Reactive.Threading.Tasks 
var ackSequence = task.ToObservable(); 
 
//some output 
ackSequence.Subscribe(x => Console.WriteLine(x)); 
 
Console.ReadLine(); 

Task cancellation

We can use task cancellation within the sequence creation to have the opportunity to cancel internal subscription executions with the usual task cancellation design. Here's a complete example:

//a cancellable sequence 
var fromDatabase = Observable.Create<DateTime>(o => 
{ 
    //a cancellation token source for timeout 
    var tks = new CancellationTokenSource(TimeSpan.FromSeconds(5)); 
    var token = tks.Token; 
 
    //the cancellable task within the sequence 
    return Task.Factory.StartNew(() => 
    { 
        //run until cancel requested 
        while (!token.IsCancellationRequested) 
            using (var cn = new SqlConnection(@"data source=(local);integrated security=true;")) 
            using (var cm = new SqlCommand("select getdate()", cn)) 
            { 
                Thread.Sleep(1000); 
                cn.Open(); 
                //read time from DB 
                o.OnNext((DateTime)cm.ExecuteScalar()); 
            } 
 
        //signal oncompleted 
        o.OnCompleted(); 
 
        //returns a disposable subscription completed object 
        //with an OnCompleted callback 
        return Disposable.Create(() => Console.WriteLine("Killing
        subscription")); 
    }, token); 
}); 
 
fromDatabase.Subscribe(x => Console.WriteLine(x)); 

The preceding example shows how to cancel the internal per subscription message generation function. Essentially, it's a classic task cancellation example. The only difference is that this executes within a Create operator that specifies the message generation workflow that runs per subscriber. In this example, we specified a 5 seconds timeout that signals CancellationToken to end the internal execution of Task, exits its execution loop, and correctly completes its job.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.138.104