18 Data streams and the Reactive Extensions

This chapter covers

  • Using IObservable to represent data streams
  • Creating, transforming, and combining IObservables
  • Knowing when you should use IObservable

If you’ve ever been to a financial hub like Wall Street or Canary Wharf, you’ve probably seen a ticker board, a luminous board displaying the latest price at which the most widely traded stocks are being traded. This is a good representation of a data stream: a stream of related values that are delivered through time.

Traders (both human and algorithms) keep an eye on the prices so that they can react to price changes: if a stock’s price rises or falls to a given level, they may decide to buy or sell, according to their investment strategy. This is, in essence, how reactive programming works: you define and consume data streams, potentially transforming the data in the streams in interesting ways, and define how your program should react to the data it consumes.

For example, if you have an Internet of Things in your home, you may have sensors that broadcast certain parameters (like room brightness or temperatures) and devices that react to changes in those parameters (regulating the window shutters or the air conditioning).

Or, in an event-sourced system like I described in chapter 13, you can publish the events as a stream and define downstream processing of those events in order to, say, recompute an account’s balance with every transaction and send the account holder a notification if the balance turns negative.

In this chapter, you’ll learn to model data streams with the IObservable interface and to use the Reactive Extensions (Rx) to create, transform, and combine IObservables. We’ll also discuss what sort of scenarios benefit from using IObservable.

Rx is a set of libraries for working with IObservables, much like LINQ provides utilities for working with IEnumerables. Rx is a rich framework, so thorough coverage is beyond the scope of this chapter. Instead, we’ll just look at some basic features and applications of IObservable and how it relates to other abstractions we’ve covered so far.

18.1 Representing data streams with IObservable

If you think of an array as a sequence of values in space (space in memory, that is), then you can think of IObservable as a sequence of values in time:

  • With an IEnumerable, you can enumerate its values at your leisure.

  • With an IObservable, you can observe the values as they come.

As with IAsyncEnumerable, which we discussed in chapter 16, IObservable is like an IEnumerable in that it contains several values, and it’s like a Task in that values are delivered asynchronously. Table 18.1 shows how IObservable relates to other abstractions.

Table 18.1 How IObservable compares with other abstractions

 

Synchronous

Asynchronous

Single value

T

Task<T>

Multiple values

IEnumerable<T>

IAsyncEnumerable<T>

IObservable<T>

IObservable is, therefore, more general than both IEnumerable and Task. You can view IEnumerable as a special case of IObservable that produces all its values right away, and you can think of Task as a special case of IObservable that only produces a single value. What’s the difference between IObservable and IAsyncEnumerable, and why do we need both?

  • IAsyncEnumerable is consumer-centric: the component that consumes the data asks the producer for some data and receives an async stream of values in return—the data is “pulled” by the consumer. The consumer interacts with the producer, hence the libraries developed to work with IAsyncEnumerable are called the Interactive Extensions (Ix). These packages are named System .Interactive.*. (IAsyncEnumerable itself is included in the BCL in the System.Collections.Generic namespace.)

  • IObservable is producer-centric: the consumer subscribes to the data, which is “pushed” out by the producer. The consumer merely reacts to the values it receives; hence the libraries developed to work with IObservable are called the Reactive Extensions (Rx). These packages are named System.Reactive.*. (IObservable itself is included in the BCL in the System namespace.)

NOTE Both Rx and Ix are maintained by the .NET Foundation; they are open source and hosted at https://github.com/dotnet/reactive.

Rx has been around for many years (there are implementations of Rx not only in .NET but in many other languages), so there are more resources and know-how at your disposal. By comparison, async streams and Ix are a recent addition; and yet, because of native language support since C# 8 via the yield return and await keywords (which we saw in chapter 16), it feels easier to create and consume them.

18.1.1 A sequence of values in time

The easiest way to develop an intuition about IObservable is through marble diagrams. Figure 18.1 shows a few examples. Each IObservable is represented with an arrow, representing time, and marbles, representing values produced by the IObservable.

Figure 18.1 Marble diagrams provide an intuitive way to understand IObservables.

The image illustrates that an IObservable can produce three different kinds of messages:

  • OnNext signals a new value, so if your IObservable represents a stream of events, OnNext will be fired when an event is ready to be consumed. This is an IObservable’s most important message, and often the only one you’ll be interested in.

  • OnCompleted signals that the IObservable is done and will signal no more values.

  • OnError signals that an error has occurred and provides the relevant Exception.

The IObservable contract

The IObservable contract specifies that an IObservable should produce messages according to the following grammar:

OnNext* (OnCompleted|OnError)?

That is, an IObservable can produce an arbitrary number of T’s (OnNext), possibly followed by a single value indicating either successful completion (OnCompleted) or an error (OnError). This means that there are three possibilities in terms of completion. An IObservable can

  • Never complete

  • Complete normally with a completion message

  • Complete abnormally; in which case, it produces an Exception

An IObservable never produces any values after it’s completed regardless of whether it completes normally or with an error.

18.1.2 Subscribing to an IObservable

Observ-ables work in tandem with observ-ers. Simply put,

  • Observers produce values

  • Observers consume them

If you want to consume the messages produced by an IObservable, you can create an observer and associate it with an IObservable via the Subscribe method. The simplest way to do this is by providing a callback that handles the values produced by the IObservable like so:

using System;                 
using System.Reactive.Linq;   
 
IObservable<int> nums = //...
 
nums.Subscribe(Console.WriteLine);

Exposes the IObservable interface

Exposes the Subscribe extension method used below

When I say that nums “produces” an int value, all I really mean is that it calls the given function (in this case, Console.WriteLine) with the value. The result of the preceding code is that when nums produces an int, it’s printed out.

I find the naming a bit confusing; you’d expect an IObservable to have an Observe method, but instead, it’s called Subscribe. Basically, you can think of the two as synonyms: an observer is a subscriber, and in order to observe an IObservable you subscribe to it.

What about the other types of messages an IObservable can produce? You can provide handlers for those as well. For instance, the following listing shows a convenience method, Trace, that attaches an observer to an IObservable; this observer simply prints a diagnostic messages whenever the IObservable signals. We’ll use this method later for debugging.

Listing 18.1 Subscribing to the messages produced by an IObservable

using static System.Console;
 
public static IDisposable Trace<T>
   (this IObservable<T> source, string name)
   => source.Subscribe
   (
      onNext: t => WriteLine($"{name} -> {t}"),
      onError: ex => WriteLine($"{name} ERROR: {ex.Message}"),
      onCompleted: () => WriteLine($"{name} END")
   );

Subscribe actually takes three handlers (all are optional arguments) to handle the different messages that an IObservable<T> can produce. It should be clear why the handlers are optional: if you don’t expect an IObservable to ever complete, there’s no point in providing an onComplete handler.

A more OO option for subscribing is to call Subscribe with an IObserver, an interface that, unsurprisingly, exposes OnNext, OnError, and OnCompleted methods.1

Also notice that Subscribe returns an IDisposable (the subscription). By disposing it, you unsubscribe.

In this section, you’ve seen some of the basic concepts and terminology around IObservable. It’s a lot to absorb, but don’t worry, things will become clearer as you see some examples. These are the basic ideas to keep in mind:

  • Observables produce values; observers consume them.

  • You associate an observer with an observable by using Subscribe.

  • An observable produces a value by calling the observers’ OnNext handler.

18.2 Creating IObservables

You now know how to consume the data in a stream by subscribing to an IObservable. But how do you get an IObservable in the first place? The IObservable and IObserver interfaces are included in .NET Standard, but if you want to create or perform many other operations on IObservables, you’ll typically use the Reactive Extensions (Rx) by installing the System.Reactive package.2

The recommended way to create IObservables is by using one of several dedicated methods included in the static Observable class; we’ll look at some next. I recommend you follow along in the REPL whenever possible.

18.2.1 Creating a timer

A timer can be modeled with an IObservable that signals at regular intervals. We can represent it with a marble diagram as follows:

This is a good way to start experimenting with IObservables because it’s simple but does include the element of time. The code in the following listing uses Observable.Interval to create a timer.

Listing 18.2 Creating an IObservable that signals every second

using System.Reactive.Linq;
 
var oneSec = TimeSpan.FromSeconds(1);
IObservable<long> ticks = Observable.Interval(oneSec);

Here we define ticks as an IObservable that will begin signaling after one second, producing a long counter value that increments every second, starting at 0. Notice I said “will begin” signaling? The resulting IObservable is lazy, so unless there’s a subscriber, nothing will actually happen. Why talk, if nobody’s listening?

If we want to see some tangible results, we need to subscribe to the IObservable. We can do this with the Trace method defined earlier:

ticks.Trace("ticks");

At this point, you’ll start to see the following messages appear in the console, one second apart:

ticks -> 0
ticks -> 1
ticks -> 2
ticks -> 3
ticks -> 4
...

Because this IObservable never completes, you’ll have to reset the REPL to stop the noise—sorry!

18.2.2 Using Subject to tell an IObservable when it should signal

Another way to create an IObservable is by instantiating a Subject. A Subject is an IObservable you can imperatively tell to produce a value, which it will, in turn, push out to its observers. For example, the following listing shows a program that turns inputs from the console into values signaled by a Subject.

Listing 18.3 Modeling user inputs as a stream

using System.Reactive.Subjects;
using static System.Console;
 
var inputs = new Subject<string>();           
 
using (inputs.Trace("inputs"))                
{
   for (string input; (input = ReadLine()) != "q";)
      inputs.OnNext(input);                   
 
   inputs.OnCompleted();                      
}                                             

Creates a Subject

Subscribes to the Subject

Tells the Subject to produce a value, which it pushes to its observers

Tells the Subject to signal completion

Leaving the using block disposes the subscription.

Every time the user types in some input, the code pushes that value to the Subject by calling its OnNext method. When the user types “q,” the code exits the for loop and calls the Subject's OnCompleted method, signaling that the stream has ended. Here we’ve subscribed to the stream of inputs using the Trace method defined in listing 18.1, so we’ll get a diagnostic message printed for each user input.

An interaction with the program looks like this (user inputs in bold):

hello
inputs -> hello
world
inputs -> world
q
inputs END

Avoid using Subject

Subject is useful for demonstrative purposes, but it works imperatively (you tell the Subject when to fire) and this goes somewhat counter to the reactive philosophy of Rx (you specify how to react to certain things when they happen).

For this reason, it’s recommended that you avoid Subjects whenever possible and instead use other methods such as Observable.Create, which you’ll see next.

As an exercise, try to rewrite the code in listing 18.3 using Observable.Create to create an IObservable of user inputs.

18.2.3 Creating IObservables from callback-based subscriptions

If your system subscribes to an external data source, such as a message queue, event broker, or publisher/subscriber, you can model that data source as an IObservable.

For example, Redis can be used as a publisher/subscriber. Redis’s API exposes a Subscribe method allowing you to register a callback that receives messages published on Redis on a given channel (a Redis channel is just a string; it allows subscribers to specify what messages they're interested in). The following listing shows how you can use Observable.Create to create an IObservable that will signal whenever messages are received from Redis.

Listing 18.4 Creating an IObservable from messages published to Redis

using StackExchange.Redis;
using System.Reactive.Linq;
 
ConnectionMultiplexer redis
   = ConnectionMultiplexer.Connect("localhost");
 
IObservable<RedisValue> RedisNotifications
(
   RedisChannel channel
)
=> Observable.Create<RedisValue>(observer =>                    
{
   var sub = redis.GetSubscriber();
   sub.Subscribe(channel, (_, val) => observer.OnNext(val));    
   return () => sub.Unsubscribe(channel);                       
});

Create takes an observer, so the given function is only called when a subscription is made.

Converts from the callback-based implementation of Subscribe to values produced by the IObservable

Returns a function that will be called when the subscription is disposed

The preceding method returns an IObservable that produces the values received from Redis on the given channel. You could use this as follows:

RedisChannel weather = "weather";
 
var weatherUpdates = RedisNotifications(weather);         
weatherUpdates.Subscribe(
   onNext: val => WriteLine($"It's {val} out there"));    
 
redis.GetDatabase(0).Publish(weather, "stormy");          
// prints: It's stormy out there                          

Gets an IObservable that signals when messages are published on the weather channel

Subscribes to the IObservable

Publishing a value causes weatherUpdates to signal; the onNext handler is called as a result.

You may ask, “What have we gained exactly?” After all, we could have registered a callback using Redis’s Subscribe method to handle messages; instead, we now have an IObservable and need to Subscribe to it to handle messages. The point is, with an IObservable, we can leverage the many operators included in Rx (which we’ll discuss in section 18.3) as well as the schedulers (which are used to optimize performance and are beyond the scope of this chapter).

18.2.4 Creating IObservables from simpler structures

I said that IObservable<T> is more general than a value T, a Task<T>, or an IEnumerable <T>, so let’s see how each of these can be promoted to an IObservable. This becomes useful if you want to combine one of these less powerful structures with an IObservable.

Return allows you to lift a single value into an IObservable that looks like this:

That is, it immediately produces the value and then completes. Here’s an example:

IObservable<string> justHello = Observable.Return("hello");
justHello.Trace("justHello");
 
// prints: justHello -> hello
//         justHello END

Return takes a value, T, and lifts it into an IObservable<T>. This is the first container where the Return function is actually called Return!

Let’s see about creating an IObservable from a single asynchronous value—a Task. Here, we have an IObservable that looks like this:

After some time, we’ll get a single value, immediately followed by the signal for completion. In code, it looks like this:

Observable.FromAsync(() => RatesApi.GetRateAsync("USDEUR"))
   .Trace("singleUsdEur");
// prints: singleUsdEur -> 0.92
//         singleUsdEur END

Finally, an IObservable created from an IEnumerable looks like this:

That is, it immediately produces all the values in the IEnumerable and completes:

IEnumerable<char> e = new[] { 'a', 'b', 'c' };
IObservable<char> chars = e.ToObservable();
chars.Trace("chars");
 
// prints: chars -> a
//         chars -> b
//         chars -> c
//         chars END

You’ve now seen many, but not all, methods for creating IObservables. You may end up creating IObservables in other ways; for example, in GUI applications, you can turn events such as mouse clicks into event streams by using Observable.FromEvent and FromEventPattern.

Now that you know about creating and subscribing to IObservable, let’s move on to the most fascinating area: transforming and combining different streams.

18.3 Transforming and combining data streams

The power of using streams comes from the many ways in which you can combine them and define new streams based on existing ones. Rather than dealing with individual values in a stream (like in most event-driven designs), you deal with the stream as a whole.

Rx offers a lot of functions (often called operators) to transform and combine IObservables in a variety of ways. I’ll discuss the most commonly used ones and add a few operators of my own. You’ll recognize the typical traits of a functional API: purity and composability.

18.3.1 Stream transformations

You can create new observables by transforming an existing observable in some way. One of the simplest operations is mapping. This is achieved with the Select method, which works (as with any other container) by applying the given function to each element in the stream, as figure 18.2 shows.

Figure 18.2 Select maps a function onto a stream.

Here’s some code that creates a timer and then maps a simple function on it:

var oneSec = TimeSpan.FromSeconds(1);
var ticks = Observable.Interval(oneSec);
 
ticks.Select(n => n * 10)
   .Trace("ticksX10");

We’re attaching an observer on the last line with the Trace method, so the preceding code will cause the following messages to be printed every second:

ticksX10 -> 0
ticksX10 -> 10
ticksX10 -> 20
ticksX10 -> 30
ticksX10 -> 40
...

Because Select follows the LINQ query pattern, we can write the same thing using LINQ:

from n in ticks select n * 10

Using Select, we can rewrite our simple program that checks exchange rates (first introduced in listing 15.1) in terms of observables:

public static void Main()
{
   var inputs = new Subject<string>();            
 
   var rates =
      from pair in inputs                         
      select RatesApi.GetRateAsync(pair).Result;  
 
   using (inputs.Trace("inputs"))                 
   using (rates.Trace("rates"))                   
       for (string input; (input = ReadLine().ToUpper()) != "Q";)
         inputs.OnNext(input);
}

The stream of values entered by the user

Maps user inputs to the corresponding retrieved values

Subscribes to both streams to produce debug messages

Here, inputs represents the stream of currency pairs entered by the user, and in rates, we map those pairs to the corresponding values retrieved from the web. We’re subscribing to both observables with the usual Trace method, so an interaction with this program could be as follows:

eurusd
inputs -> EURUSD
rates -> 1.0852
chfusd
inputs -> CHFUSD
rates -> 1.0114

Notice, however, that in the code, we have a blocking call to Result. In a real application, we wouldn’t want to block a thread, so how could we avoid that?

We saw that a Task can easily be promoted to an IObservable. If we promote the Task of retrieving each rate from the remote API to an IObservable rather than waiting for its result, then we get an IObservable of IObservables. Sound familiar? Bind! We can use SelectMany instead of Select, which flattens the result into a single IObservable. We can, therefore, rewrite the definition of the rates stream as follows:

var rates = inputs.SelectMany
   (pair => Observable.FromAsync(() => RatesApi.GetRateAsync(pair)));

Observable.FromAsync promotes the Task returned by GetRateAsync to an IObservable, and SelectMany flattens all these IObservables into a single IObservable.

Because it’s always possible to promote a Task to an IObservable, an overload of SelectMany exists that does just that (this is similar to how we overloaded Bind to work with an IEnumerable and an Option-returning function in section 6.5). This means we can avoid explicitly calling FromAsync and return a Task instead. Furthermore, we can use a LINQ query:

var rates =
   from pair in inputs
   from rate in RatesApi.GetRateAsync(pair)
   select rate;

The program thus modified works the same way as before but without the blocking call to Result.

IObservable also supports many of the other operations that are supported by IEnumerable, such as filtering with Where, Take (takes the first n values), Skip, First, and so on.

18.3.2 Combining and partitioning streams

There are also many operators that allow you to combine two streams into a single one. For example, Concat produces all the values of one IObservable, followed by all the values in another, as figure 18.3 shows.

Figure 18.3 Concat waits for an IObservable to complete and then produces elements from the other IObservable.

For instance, in our exchange rate lookup, we have an observable called rates with the retrieved rates. If we want an observable of all the messages the program should output to the console, this must include the retrieved rates but also an initial message prompting the user for some input. We can lift this single message into an IObservable with Return and then use Concat to combine it with the other messages:

IObservable<decimal> rates = //...
 
IObservable<string> outputs = Observable
   .Return("Enter a currency pair like 'EURUSD', or 'q' to quit")
   .Concat(rates.Select(Decimal.ToString));

In fact, the need to provide a starting value for an IObservable is so common that there’s a dedicated function for it—StartWith. The preceding code is equivalent to this:

var outputs = rates.Select(Decimal.ToString)
   .StartWith("Enter a currency pair like 'EURUSD', or 'q' to quit");

Whereas Concat waits for the left IObservable to complete before producing values from the right observable, Merge combines values from two IObservables without delay, as figure 18.4 shows.

Figure 18.4 Merge merges two IObservables into one.

For example, if you have a stream of valid values and one of error messages, you could combine them with Merge as follows:

IObservable<decimal> rates = //...
IObservable<string> errors = //...
 
var outputs = rates.Select(Decimal.ToString)
   .Merge(errors);

Just as you might want to merge values from different streams, the opposite operation—partitioning a stream according to some criterion—is also often useful. Figure 18.5 illustrates this.

Figure 18.5 Partitioning an IObservable according to a predicate

Partition returns a pair of IObservables, so you can destructure it like this:

var (evens, odds) = ticks.Partition(x => x % 2 == 0);

Partitioning an IObservable of values is roughly equivalent to an if when dealing with a single value, so it’s useful when you have a stream of values that you want to process differently, depending on some condition. For example, if you have a stream of messages and some criterion for validation, you can partition the stream into two streams of valid and invalid messages and process them accordingly.

18.3.3 Error handling with IObservable

Error handling when working with IObservable works differently from what you might expect. In most programs, an uncaught exception either causes the whole application to crash or causes the processing of a single message/request to fail, while subsequent requests work fine. To illustrate how things work differently in Rx, consider this version of our program for looking up exchange rates:

var inputs = new Subject<string>();
 
var rates =
   from pair in inputs
   from rate in RatesApi.GetRateAsync(pair)
   select rate;
 
var outputs = from r in rates select r.ToString();
 
using (inputs.Trace("inputs"))
using (rates.Trace("rates"))
using (outputs.Trace("outputs"))
   for (string input; (input = ReadLine().ToUpper()) != "Q";)
      inputs.OnNext(input);

The program captures three streams, each dependent on another (outputs is defined in terms of rates, and rates is defined in terms of inputs, as figure 18.6 shows), and we’re printing diagnostic messages for all of them with Trace.

Figure 18.6 Simple dataflow between three IObservables

Now look what happens if you break the program by passing an invalid currency pair:

eurusd
inputs -> EURUSD
rates -> 1.0852
outputs -> 1.0852
chfusd
inputs -> CHFUSD
rates -> 1.0114
outputs -> 1.0114
xxx
inputs -> XXX
rates ERROR: Input string was not in a correct format.
outputs ERROR: Input string was not in a correct format.
chfusd
inputs -> CHFUSD
eurusd
inputs -> EURUSD

What this shows is that once rates errors, it never signals again. This behavior is as specified in the IObservable contract (see the sidebar on “The IObservable contract”). As a result, everything downstream is also “dead.” But IObservables upstream of the failed one are fine: inputs is still signaling, as would any other IObservables defined in terms of inputs.

To prevent your system from going into such a state, where a branch of the dataflow dies while the remaining graph keeps functioning, you can use the techniques you learned for functional error handling.

The following listing shows the implementation of Safely, a helper function included in LaYumba.Functional that allows you to safely apply a Task-returning function to each element in a stream. The result is a pair of streams: a stream of successfully computed values and a stream of exceptions.

Listing 18.5 Safely performing a Task and returning two streams

public static (IObservable<R> Completed, IObservable<Exception> Faulted)
   Safely<T, R>(this IObservable<T> ts, Func<T, Task<R>> f)
   => ts
      .SelectMany(t => f(t).Map(
         Faulted: ex => ex,                                           
         Completed: r => Exceptional(r)))                             
      .Partition();
 
static (IObservable<T> Successes, IObservable<Exception> Exceptions)  
   Partition<T>(this IObservable<Exceptional<T>> excTs)               
{
   bool IsSuccess(Exceptional<T> ex)
      => ex.Match(_ => false, _ => true);
 
   T ExtractValue(Exceptional<T> ex)
      => ex.Match(_ => default, t => t);
 
   Exception ExtractException(Exceptional<T> ex)
      => ex.Match(exc => exc, _ => default);
 
   var (ts, errs) = excTs.Partition(IsSuccess);
   return
   (
      Successes: ts.Select(ExtractValue),
      Exceptions: errs.Select(ExtractException)
   );
}

Converts each Task<R> to a Task<Exceptional<R>> to get a stream of Exceptionals

Partitions a stream of Exceptionals into successfully computed values and exceptions

For each T in the given stream, we apply the Task-returning function f. We then use the binary overload of Map defined in section 16.1.4 to convert each resulting Task<R> to a Task<Exceptional<R>>. This is where we gain safety: instead of an inner value R that throws an exception when it’s accessed, we have an Exceptional<R> in the appropriate state. SelectMany flattens away the Tasks in the stream and returns a stream of Exceptionals. We can then partition this in successes and exceptions.

With this in place, we can refactor our program to handle errors more gracefully:

var (rates, errors) = inputs.Safely(RatesApi.GetRateAsync);

18.3.4 Putting it all together

The following listing showcases the various techniques you’ve learned in this section. It shows the exchange rates lookup program, refactored to safely handle errors, and without the debug information.

Listing 18.6 The program refactored to safely handle errors

public static void Main()
{
   var inputs = new Subject<string>();
 
   var (rates, errors) = inputs.Safely(RatesApi.GetRateAsync);
 
   var outputs = rates
      .Select(Decimal.ToString)
      .Merge(errors.Select(ex => ex.Message))
      .StartWith("Enter a currency pair like 'EURUSD', or 'q' to quit");
 
   using (outputs.Subscribe(WriteLine))
      for (string input; (input = ReadLine().ToUpper()) != "Q";)
         inputs.OnNext(input);
}

The dataflow diagram in figure 18.7 shows the various IObservables involved and how they depend on one another.

Figure 18.7 Dataflow with a separate branch for handling errors

Notice how Safely allows us to create two branches, each of which can be processed independently until a uniform representation for both cases is obtained, and they can be merged.

This program nicely illustrates the three parts that typically compose a program that uses IObservables:

  • Set up the data sources—In our case, this is captured by inputs.

  • Process the data—This is where you use functions like Select, Merge, and so on.

  • Consume the results—Observers consume the most downstream IObservables (in this case, outputs) to perform side effects.

18.4 Implementing logic that spans multiple events

So far I’ve mostly aimed at familiarizing you with IObservables and the many operators that can be used with them. For this, I’ve used familiar examples like the exchange rates lookup. After all, given that you can promote any value T, Task<T>, or IEnumerable<T> to an IObservable<T>, you could pretty much write all of your code in terms of IObservables! But should you?

The answer, of course, is probably not. The area in which IObservable and Rx really shine is when you can use them to write stateful programs without any explicit state manipulation. By stateful programs, I mean programs in which events aren’t treated independently; past events influence how new events are treated. In this section, you’ll see a few such examples.

18.4.1 Detecting sequences of pressed keys

At some point, you’ve probably written an event handler that listens to a user’s keypresses and performs some actions based on what key and key modifiers were pressed. A callback-based approach is satisfactory for many cases, but what if you want to listen to a specific sequence of keypresses? For example, say you want to implement some behavior when the user presses the combination Alt-K-B.

In this case, pressing Alt-B should lead to different behavior, based on whether it was shortly preceded by the leading Alt-K, so keypresses can’t be treated independently. If you have a callback-based mechanism that deals with single keypressed events, you effectively need to set in motion a state machine when the user presses Alt-K, and then wait for the possible Alt-B that will follow, reverting to the previous state if no Alt-B is received in time. It’s actually pretty complicated!

With IObservable, this can be solved much more elegantly. Let’s assume that we have a stream of keypress events, keys. We’re looking for two events—Alt-K and Alt-B—that happen on that same stream in quick succession. In order to do this, we need to explore how to combine a stream with itself. Consider the following diagram:

It’s important to understand this diagram. The expression keys.Select(_ => keys) yields a new IObservable that maps each value produced by keys to keys itself. Therefore, when keys produces its first value, “a,” this new IObservable produces an IObservable that has all following values in keys. When keys produces its second value, “b,” the new IObservable produces another IObservable that has all the values that follow “b,” and so on.3

Looking at the types can also help clarify this:

keys                   : IObservable<KeyInfo>
_ => keys              : KeyInfo  IObservable<KeyInfo>
keys.Select(_ => keys) : IObservable<IObservable<KeyInfo>>

If we use SelectMany instead, all these values are flattened into a single stream:

Of course, if we’re looking for two consecutive keypresses, we don’t need all values that follow an item but just the next one. Instead of mapping each value to the whole IObservable, let’s reduce it to the first item with Take:

We’re getting close. Now, let’s make the following changes:

  • Instead of ignoring the current value, pair it with the following value.

  • Use SelectMany to obtain a flat IObservable.

  • Use LINQ syntax.

The resulting expression pairs each value in an IObservable with the previously emitted value:

This is a pretty useful function in its own right, and I’ll call it PairWithPrevious. We’ll use it later.

But for this particular scenario, we only want pairs to be created if they’re sufficiently close in time. This can be achieved easily by using an overload of Take that takes a Timespan as the following listing shows.

Listing 18.7 Detecting when the user presses the Alt-K-B key sequence

IObservable<ConsoleKeyInfo> keys = //...
var halfSec = TimeSpan.FromMilliseconds(500);
 
var keysAlt = keys
   .Where(key => key.Modifiers.HasFlag(ConsoleModifiers.Alt));
 
var twoKeyCombis =
   from first in keysAlt                         
   from second in keysAlt.Take(halfSec).Take(1)  
   select (First: first, Second: second);        
 
var altKB =
   from pair in twoKeyCombis
   where pair.First.Key == ConsoleKey.K
      && pair.Second.Key == ConsoleKey.B
   select Unit();

For any keypress, pairs it with the next keypress that occurs within a half-second

As you can see, the solution is simple and elegant. You can apply this approach to recognize more complex patterns within sequences of events—all without explicitly keeping track of state and introducing side effects!

You’ve probably also realized that coming up with such a solution isn’t necessarily easy. It takes a while to get familiar with IObservable and its many operators, and to develop an understanding of how to use them.

18.4.2 Reacting to multiple event sources

Imagine we have a bank account denominated in Euros, and we’d like to keep track of its value in US Dollars. Both changes in balance and changes in the exchange rate cause the dollar balance to change. To react to changes from different streams, we could use CombineLatest, which takes the latest values from two observables when one of them signals, as figure 18.8 shows.

Figure 18.8 CombineLatest signals whenever one of two IObservables signals.

Its usage would be as follows:

IObservable<decimal> balance = //...
IObservable<decimal> eurUsdRate = //...
 
var balanceInUsd = balance.CombineLatest(eurUsdRate
   , (bal, rate) => bal * rate);

This works, but it doesn’t take into account the fact that the exchange rate is much more volatile than the account balance. In fact, if exchange rates come from the FX market, there may well be dozens or hundreds of tiny movements every second! Surely this level of detail isn’t required for a private client who wants to keep an eye on their finances. Reacting to each change in exchange rate would flood the client with unwanted notifications.

This is an example of an IObservable producing too much data (see the sidebar on “Backpressure”). For this, we can use Sample, an operator that takes an IObservable that acts as a data source, and another IObservable that signals when values should be produced. Sample is illustrated in figure 18.9.

Figure 18.9 Sample produces the values from a source stream when a sampler stream signals.

In this scenario, we can create an IObservable that signals at 10-minute intervals and use it to sample the stream of exchange rates, as the following listing shows.

Listing 18.8 Sampling a value from an IObservable every 10 minutes

IObservable<decimal> balance = //...
IObservable<decimal> eurUsdRate = //...
 
var tenMins = TimeSpan.FromMinutes(10);
var sampler = Observable.Interval(tenMins);
var eurUsdSampled = eurUsdRate.Sample(sampler);
 
var balanceInUsd = balance.CombineLatest(eurUsdSampled
   , (bal, rate) => bal * rate);

This is another scenario in which our logic spans multiple events, and using Rx operators CombineLatest and Sample allows us to encode this logic without explicitly keeping any state.

Backpressure: When an IObservable produces data too quickly

When you iterate over the items in an IEnumerable or an IAsyncEnumerable, you’re “pulling” or requesting items, so you can process them at your own pace. With IObservable, items are “pushed” to you (the consuming code). If an IObservable produces values more rapidly than they can be consumed by the subscribed observers, this can cause excessive backpressure, causing strain on your system.

To ease backpressure, Rx provides several operators:

  • Throttle

  • Sample

  • Buffer

  • Window

  • Debounce

Each has different behavior and several overloads, so we won’t discuss them in detail. The point is that with these operators, you can easily and declaratively implement logic like, “I want to consume items in batches of 10 at a time,” or “If a cluster of values come in quick succession, I only want to consume the last one.” Implementing such logic in a callback-based solution, where each value is received independently, would require you to manually keep some state.

18.4.3 Notifying when an account becomes overdrawn

For a final, more business-oriented example, imagine that in the context of the BOC application, we consume a stream of all transactions that affect bank accounts, and we want to send clients a notification if their account’s balance becomes negative.

An account’s balance is the sum of all the transactions that have affected it, so at any point, given a list of past Transactions for an account, you could compute its current balance using Aggregate.

There is an Aggregate function for IObservable; it waits for an IObservable to complete and aggregates all the values it produces into a single value. But this isn’t what we need: we don’t want to wait for the stream to complete, but to recompute the balance every time we receive a new Transaction. For this, we can use Scan (see figure 18.10), which is similar to Aggregate but aggregates all previous values with every new value that is produced.

Figure 18.10 Scan aggregates all values produced so far.

As a result, we can effectively use Scan to keep state. Given an IObservable of Transactions affecting a bank account, we can use Scan to add up the amounts of all past transactions as they happen, obtaining an IObservable that signals with the new balance when the account balance changes:

IObservable<Transaction> transactions = //...
decimal initialBalance = 0;
 
IObservable<decimal> balance = transactions.Scan(initialBalance
   , (bal, trans) => bal + trans.Amount);

Now that we have a stream of values representing an account’s current balance, we need to single out what changes in balance cause the account to “dip into the red,” going from positive to negative.

For this, we need to look at changes in the balance, and we can do this with PairWithPrevious, which signals the current value together with the previously emitted value. You saw the implementation of PairWithPrevious in section 18.4.1, but here it is again for reference:

// ----1-------2---------3--------4------>
//
//            PairWithPrevious
//
// ------------(1,2)-----(2,3)----(3,4)-->
//
public static IObservable<(T Previous, T Current)>
   PairWithPrevious<T>(this IObservable<T> source)
   => from first in source
      from second in source.Take(1)
      select (Previous: first, Current: second);

This is one of many examples of custom operations that can be defined in terms of existing operations. The preceding snippet also shows how you can use ASCII marble diagrams to document your code.

We can use PairWithPrevious to signal when an account dips into the red as follows:

IObservable<Unit> dipsIntoTheRed =
   from bal in balance.PairWithPrevious()
   where bal.Previous >= 0
      && bal.Current < 0
   select Unit();

Now let’s make things a bit closer to the real world. If your system receives a stream of transactions, this will probably include transactions for all accounts. Therefore, we must group them by account ID in order to correctly compute the balance. GroupBy works for IObservable similarly to how it does for IEnumerable, but it returns a stream of streams.

The following listing shows how to adapt the logic, assuming an initial stream of transactions for all accounts.

Listing 18.9 Signalling whenever an account becomes overdrawn

IObservable<Transaction> transactions = //...    
 
IObservable<Guid> dipsIntoRed = transactions
   .GroupBy(t => t.AccountId)                    
   .Select(DipsIntoTheRed)                       
   .MergeAll();                                  
 
static IObservable<Guid> DipsIntoTheRed
   (IGroupedObservable<Guid, Transaction> transactions)
{
   Guid accountId = transactions.Key;
   decimal initialBalance = 0;
 
   var balance = transactions.Scan(initialBalance
      , (bal, trans) => bal + trans.Amount);
 
   return from bal in balance.PairWithPrevious()
          where bal.Previous >= 0
             && bal.Current < 0
          select accountId;                     
}
 
public static IObservable<T> MergeAll<T>
   (this IObservable<IObservable<T>> source)
   => source.SelectMany(x => x);

Includes transactions from all accounts

Groups by account ID

Signals dips into the red for any particular account

Flattens the result into a single observable

Signals the ID of the offending account

Now we’re starting with a stream of Transactions for all accounts, and we end up with a stream of Guids that will signal whenever an account dips into the red, including the Guid identifying the offending account. Notice how this program is effectively keeping track of the balances of all accounts without the need for us to do any explicit state manipulation.

18.5 When should you use IObservable?

In this chapter, you’ve seen how you can use IObservable to represent data streams and Rx to create and manipulate IObservables. There are many details and features of Rx that we haven’t discussed at all, but we’ve still covered enough ground for you to start using IObservables and to further explore the features of Rx as needed.4

As you’ve seen, having an abstraction that captures a data stream enables you to detect patterns and specify logic that spans across multiple events within the same stream or across different streams. This is where I’d recommend using IObservable. The corollary is that, if your events can be handled independently, then you probably shouldn’t use IObservables because using them will probably reduce the readability of your code.

An important thing to keep in mind is that because OnNext has no return value, an IObservable can only push data downstream and never receives any data back. Hence, IObservables are best combined into one-directional dataflows. For instance, if you read events from a queue and write some data into a DB as a result, IObservable can be a good fit; likewise if you have a server that communicates with web clients via WebSockets, where messages are exchanged between client and server in a fire-and-forget fashion.

On the other hand, IObservables are not well-suited to a request-response model such as HTTP. You could model the received requests as a stream and compute a stream of responses, but you’d then have no easy way to tie these responses back to the original requests.

Finally, if you have complex synchronization patterns that can’t be captured with the operators in Rx, and you need more fine-grained control over how messages are sequenced and processed, you may find the building blocks in the System.DataFlow namespace (based on in-memory queues) more appropriate.

Summary

  • IObservable<T> represents a stream of Ts, a sequence of values in time.

  • An IObservable produces messages according to the grammar

    OnNext* (OnCompleted|OnError)?.
  • Writing a program with IObservables involves three steps:

    • Create IObservables using the methods in System.Reactive.Linq .Observable.
    • Transform and combine IObservables using the operators in Rx or other operators you may define.
    • Subscribe to and consume the values produced by the IObservable.
  • Associate an observer to an IObservable with Subscribe.

  • Remove an observer by disposing of the subscription returned by Subscribe.

  • Separate side effects (in observers) from logic (in stream transformations).

  • When deciding on whether to use IObservable, consider the following:

    • IObservable allows you to specify logic that spans multiple events.
    • IObservable is good for modeling unidirectional dataflows.

1 IObserver is the method declared in the IObservable interface. The overload that takes the callbacks is an extension method.

2 Rx includes several libraries. The main library, System.Reactive, bundles the packages you’ll most commonly need: System.Reactive.Interfaces, System.Reactive.Core, System.Reactive.Linq, and System.Reactive.PlatformServices. There are several other packages that are useful in more specialized scenarios, such as if you’re using Windows forms.

3 Imagine what keys.Select(_ => keys) would look like if keys were an IEnumerable: for each value, you’d be taking the whole IEnumerable. In the end, you’d have an IEnumerable containing n replicas of keys (n being the length of keys). With IObservable, the behavior is different because of the element of time, so when you say, “Give me keys,” what you really get is all values keys will produce in the future.

4 To give you an idea of what was not covered, there are many more operators along with important implementation details of Rx: schedulers (which determine how calls to observers are dispatched), hot versus cold observables (not all observables are lazy), and Subjects with different behaviors, for example.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.59.193.151