Using schedulers in Rx

Sometimes, we need to have an IObservable subscription run at a specific time. Imagine having to synchronize events across servers in geographically different areas and time zones. You might also need to read data from a queue while preserving the order in which the events occur. Another example would be to perform some kind of I/O task that could take some time to complete. Schedulers come in very handy in these situations.

Getting ready

Additionally, you can consider reading up more on using schedulers on MSDN. Have a look at https://msdn.microsoft.com/en-us/library/hh242963(v=vs.103).aspx.

How to do it…

  1. If you haven't already done so, create a new Windows Form application and call it winformRx. Open the form designer and in Toolbox, search for the TextBox control and add it to your form:
    How to do it…
  2. Next, add a label control to your form:
    How to do it…
  3. Double-click on your Windows Form designer to create the onload event handler. Inside this handler, add some code to read the text entered into the text box and only display that text 5 seconds after the user has stopped typing. This is achieved using the Throttle keyword. Add a subscription to the searchTerm variable, writing the result of the text input to the label control's text property:
    private void Form1_Load(object sender, EventArgs e)
    {
        var searchTerm = Observable.FromEventPattern<EventArgs>(textBox1, "TextChanged")
        .Select(x => ((TextBox)x.Sender).Text) 
        .Throttle(TimeSpan.FromMilliseconds(5000));
    
        searchTerm.Subscribe(trm => label1.Text = trm);
    }

    Note

    Note that you might need to add System.Reactive.Linq in your using statements.

  4. Run your application and start typing in some text into the text box. Immediately, we will receive an exception. It is a cross-thread violation. This occurs when there is an attempt to update the UI from a background thread. The Observable interface is running a timer from System.Threading, which isn't on the same thread as the UI. Luckily, there is an easy way to overcome this. Well, it turns out that the UI-threading capabilities lie in a different assembly, which we found easiest to get via the Package Manager Console:
    How to do it…
  5. Click on View | Other Windows | Package Manager Console to access the Package Manager Console:
    How to do it…
  6. Enter the following command: PM> Install-Package System.Reactive.Windows.Forms
    How to do it…

    Note

    Please note that you need to ensure that the Default project selection is set to winformRx in the Package Manager Console. If you don't see this option, resize the Package Manager Console screen width until the option is displayed. This way you can be certain that the package is added to the correct project.

  7. After the installation completes, modify your code in the onload event handler and change searchTerm.Subscribe(trm => label1.Text = trm);, which does the subscription, to look like this:
    searchTerm.ObserveOn(new ControlScheduler(this)).Subscribe(trm => label1.Text = trm);

    You will notice that we are using the ObserveOn method here. What this basically tells the compiler is that the this keyword in new ControlScheduler(this) is actually a reference to our Windows Form. Therefore, ControlScheduler will use the Windows Forms timers to create the interval to update our UI. The message happens on the correct thread, and we no longer have our cross-thread violation.

  8. If you have not added the System.Reactive.Concurrency namespace to your project, Visual Studio will underline the ControlScheduler line of code with a squiggly line. Pressing Ctrl + . (the Control key and dot) will allow you to add the missing namespace:
    How to do it…
  9. This means that System.Reactive.Concurrency contains a scheduler that can talk to Windows Forms controls so that it can do the scheduling. Run your application again and start typing some text into your text box:
    How to do it…
  10. Five seconds after we stop typing, the throttle condition is fulfilled, and the text is output to our label:
    How to do it…

How it works…

What we need to keep in mind here from the code we created is that there are ObserveOn and Subscribe. You should not confuse the two. In most cases, when dealing with schedulers, you will use ObserveOn. The ObserveOn method allows you to parametrize where the OnNext, OnCompleted, and OnError messages run. With Subscribe, we parameterize where the actual subscribe and unsubscribe code runs.

We also need to remember that Rx use the threading timers (System.Threading.Timer) as a default, which is why we encountered the cross-thread violation earlier. As you saw though, we used schedulers to parameterize what timer to use. The way schedulers do this is by exposing three components. These are:

  • The scheduler's ability to perform some action
  • The order in which the action or work to be performed is executed
  • A clock that allows the scheduler to have a notion of time

The use of a clock is important because it allows the developer to use timers on remote machines, for example (where there might be a time difference between you and them), to tell them to perform an action at a particular time.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.3.175