IObservable
to represent data streamsIObservable
sIObservable
If you’ve ever been to a financial hub like Wall Street or Canary Wharf, you’ve probably seen a ticker board, a luminous board displaying the latest price at which the most widely traded stocks are being traded. This is a good representation of a data stream: a stream of related values that are delivered through time.
Traders (both human and algorithms) keep an eye on the prices so that they can react to price changes: if a stock’s price rises or falls to a given level, they may decide to buy or sell, according to their investment strategy. This is, in essence, how reactive programming works: you define and consume data streams, potentially transforming the data in the streams in interesting ways, and define how your program should react to the data it consumes.
For example, if you have an Internet of Things in your home, you may have sensors that broadcast certain parameters (like room brightness or temperatures) and devices that react to changes in those parameters (regulating the window shutters or the air conditioning).
Or, in an event-sourced system like I described in chapter 13, you can publish the events as a stream and define downstream processing of those events in order to, say, recompute an account’s balance with every transaction and send the account holder a notification if the balance turns negative.
In this chapter, you’ll learn to model data streams with the IObservable
interface and to use the Reactive Extensions (Rx) to create, transform, and combine IObservable
s. We’ll also discuss what sort of scenarios benefit from using IObservable
.
Rx is a set of libraries for working with IObservable
s, much like LINQ provides utilities for working with IEnumerable
s. Rx is a rich framework, so thorough coverage is beyond the scope of this chapter. Instead, we’ll just look at some basic features and applications of IObservable
and how it relates to other abstractions we’ve covered so far.
If you think of an array as a sequence of values in space (space in memory, that is), then you can think of IObservable
as a sequence of values in time:
With an IEnumerable
, you can enumerate its values at your leisure.
With an IObservable
, you can observe the values as they come.
As with IAsyncEnumerable
, which we discussed in chapter 16, IObservable
is like an IEnumerable
in that it contains several values, and it’s like a Task
in that values are delivered asynchronously. Table 18.1 shows how IObservable
relates to other abstractions.
IObservable
is, therefore, more general than both IEnumerable
and Task
. You can view IEnumerable
as a special case of IObservable
that produces all its values right away, and you can think of Task
as a special case of IObservable
that only produces a single value. What’s the difference between IObservable
and IAsyncEnumerable
, and why do we need both?
IAsyncEnumerable
is consumer-centric: the component that consumes the data asks the producer for some data and receives an async stream of values in return—the data is “pulled” by the consumer. The consumer interacts with the producer, hence the libraries developed to work with IAsyncEnumerable
are called the Interactive Extensions (Ix). These packages are named System .Interactive.*
. (IAsyncEnumerable
itself is included in the BCL in the System.Collections.Generic
namespace.)
IObservable
is producer-centric: the consumer subscribes to the data, which is “pushed” out by the producer. The consumer merely reacts to the values it receives; hence the libraries developed to work with IObservable
are called the Reactive Extensions (Rx). These packages are named System.Reactive.*
. (IObservable
itself is included in the BCL in the System
namespace.)
NOTE Both Rx and Ix are maintained by the .NET Foundation; they are open source and hosted at https://github.com/dotnet/reactive.
Rx has been around for many years (there are implementations of Rx not only in .NET but in many other languages), so there are more resources and know-how at your disposal. By comparison, async streams and Ix are a recent addition; and yet, because of native language support since C# 8 via the yield return
and await
keywords (which we saw in chapter 16), it feels easier to create and consume them.
The easiest way to develop an intuition about IObservable
is through marble diagrams. Figure 18.1 shows a few examples. Each IObservable
is represented with an arrow, representing time, and marbles, representing values produced by the IObservable
.
The image illustrates that an IObservable
can produce three different kinds of messages:
OnNext
signals a new value, so if your IObservable
represents a stream of events, OnNext
will be fired when an event is ready to be consumed. This is an IObservable
’s most important message, and often the only one you’ll be interested in.
OnCompleted
signals that the IObservable
is done and will signal no more values.
OnError
signals that an error has occurred and provides the relevant Exception
.
Observ-ables work in tandem with observ-ers. Simply put,
If you want to consume the messages produced by an IObservable
, you can create an observer and associate it with an IObservable
via the Subscribe
method. The simplest way to do this is by providing a callback that handles the values produced by the IObservable
like so:
using System; ❶ using System.Reactive.Linq; ❷ IObservable<int> nums = //... nums.Subscribe(Console.WriteLine);
❶ Exposes the IObservable
interface
❷ Exposes the Subscribe
extension method used below
When I say that nums
“produces” an int
value, all I really mean is that it calls the given function (in this case, Console.WriteLine
) with the value. The result of the preceding code is that when nums
produces an int
, it’s printed out.
I find the naming a bit confusing; you’d expect an IObservable
to have an Observe
method, but instead, it’s called Subscribe
. Basically, you can think of the two as synonyms: an observer is a subscriber, and in order to observe an IObservable
you subscribe to it.
What about the other types of messages an IObservable
can produce? You can provide handlers for those as well. For instance, the following listing shows a convenience method, Trace
, that attaches an observer to an IObservable
; this observer simply prints a diagnostic messages whenever the IObservable
signals. We’ll use this method later for debugging.
using static System.Console; public static IDisposable Trace<T> (this IObservable<T> source, string name) => source.Subscribe ( onNext: t => WriteLine($"{name} -> {t}"), onError: ex => WriteLine($"{name} ERROR: {ex.Message}"), onCompleted: () => WriteLine($"{name} END") );
Subscribe
actually takes three handlers (all are optional arguments) to handle the different messages that an IObservable<T>
can produce. It should be clear why the handlers are optional: if you don’t expect an IObservable
to ever complete, there’s no point in providing an onComplete
handler.
A more OO option for subscribing is to call Subscribe
with an IObserver
, an interface that, unsurprisingly, exposes OnNext
, OnError
, and OnCompleted
methods.1
Also notice that Subscribe
returns an IDisposable
(the subscription). By disposing it, you unsubscribe.
In this section, you’ve seen some of the basic concepts and terminology around IObservable
. It’s a lot to absorb, but don’t worry, things will become clearer as you see some examples. These are the basic ideas to keep in mind:
You associate an observer with an observable by using Subscribe
.
An observable produces a value by calling the observers’ OnNext
handler.
You now know how to consume the data in a stream by subscribing to an IObservable
. But how do you get an IObservable
in the first place? The IObservable
and IObserver
interfaces are included in .NET Standard, but if you want to create or perform many other operations on IObservable
s, you’ll typically use the Reactive Extensions (Rx) by installing the System.Reactive
package.2
The recommended way to create IObservable
s is by using one of several dedicated methods included in the static Observable
class; we’ll look at some next. I recommend you follow along in the REPL whenever possible.
A timer can be modeled with an IObservable
that signals at regular intervals. We can represent it with a marble diagram as follows:
This is a good way to start experimenting with IObservable
s because it’s simple but does include the element of time. The code in the following listing uses Observable.Interval
to create a timer.
using System.Reactive.Linq; var oneSec = TimeSpan.FromSeconds(1); IObservable<long> ticks = Observable.Interval(oneSec);
Here we define ticks
as an IObservable
that will begin signaling after one second, producing a long
counter value that increments every second, starting at 0. Notice I said “will begin” signaling? The resulting IObservable
is lazy, so unless there’s a subscriber, nothing will actually happen. Why talk, if nobody’s listening?
If we want to see some tangible results, we need to subscribe to the IObservable
. We can do this with the Trace
method defined earlier:
At this point, you’ll start to see the following messages appear in the console, one second apart:
Because this IObservable
never completes, you’ll have to reset the REPL to stop the noise—sorry!
Another way to create an IObservable
is by instantiating a Subject
. A Subject
is an IObservable
you can imperatively tell to produce a value, which it will, in turn, push out to its observers. For example, the following listing shows a program that turns inputs from the console into values signaled by a Subject
.
using System.Reactive.Subjects; using static System.Console; var inputs = new Subject<string>(); ❶ using (inputs.Trace("inputs")) ❷ { for (string input; (input = ReadLine()) != "q";) inputs.OnNext(input); ❸ inputs.OnCompleted(); ❹ } ❺
❸ Tells the Subject
to produce a value, which it pushes to its observers
❹ Tells the Subject
to signal completion
❺ Leaving the using block disposes the subscription.
Every time the user types in some input, the code pushes that value to the Subject
by calling its OnNext
method. When the user types “q,” the code exits the for
loop and calls the Subject
's OnCompleted
method, signaling that the stream has ended. Here we’ve subscribed to the stream of inputs using the Trace
method defined in listing 18.1, so we’ll get a diagnostic message printed for each user input.
An interaction with the program looks like this (user inputs in bold):
If your system subscribes to an external data source, such as a message queue, event broker, or publisher/subscriber, you can model that data source as an IObservable
.
For example, Redis can be used as a publisher/subscriber. Redis’s API exposes a Subscribe
method allowing you to register a callback that receives messages published on Redis on a given channel (a Redis channel is just a string; it allows subscribers to specify what messages they're interested in). The following listing shows how you can use Observable.Create
to create an IObservable
that will signal whenever messages are received from Redis.
using StackExchange.Redis; using System.Reactive.Linq; ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost"); IObservable<RedisValue> RedisNotifications ( RedisChannel channel ) => Observable.Create<RedisValue>(observer => ❶ { var sub = redis.GetSubscriber(); sub.Subscribe(channel, (_, val) => observer.OnNext(val)); ❷ return () => sub.Unsubscribe(channel); ❸ });
❶ Create takes an observer, so the given function is only called when a subscription is made.
❷ Converts from the callback-based implementation of Subscribe
to values produced by the IObservable
❸ Returns a function that will be called when the subscription is disposed
The preceding method returns an IObservable
that produces the values received from Redis on the given channel. You could use this as follows:
RedisChannel weather = "weather"; var weatherUpdates = RedisNotifications(weather); ❶ weatherUpdates.Subscribe( onNext: val => WriteLine($"It's {val} out there")); ❷ redis.GetDatabase(0).Publish(weather, "stormy"); ❸ // prints: It's stormy out there ❸
❶ Gets an IObservable
that signals when messages are published on the weather channel
❷ Subscribes to the IObservable
❸ Publishing a value causes weatherUpdates
to signal; the onNext
handler is called as a result.
You may ask, “What have we gained exactly?” After all, we could have registered a callback using Redis’s Subscribe
method to handle messages; instead, we now have an IObservable
and need to Subscribe
to it to handle messages. The point is, with an IObservable
, we can leverage the many operators included in Rx (which we’ll discuss in section 18.3) as well as the schedulers (which are used to optimize performance and are beyond the scope of this chapter).
I said that IObservable<T>
is more general than a value T
, a Task<T>
, or an IEnumerable <T>
, so let’s see how each of these can be promoted to an IObservable
. This becomes useful if you want to combine one of these less powerful structures with an IObservable
.
Return
allows you to lift a single value into an IObservable
that looks like this:
That is, it immediately produces the value and then completes. Here’s an example:
IObservable<string> justHello = Observable.Return("hello"); justHello.Trace("justHello"); // prints: justHello -> hello // justHello END
Return
takes a value, T
, and lifts it into an IObservable<T>
. This is the first container where the Return
function is actually called Return
!
Let’s see about creating an IObservable
from a single asynchronous value—a Task
. Here, we have an IObservable
that looks like this:
After some time, we’ll get a single value, immediately followed by the signal for completion. In code, it looks like this:
Observable.FromAsync(() => RatesApi.GetRateAsync("USDEUR")) .Trace("singleUsdEur"); // prints: singleUsdEur -> 0.92 // singleUsdEur END
Finally, an IObservable
created from an IEnumerable
looks like this:
That is, it immediately produces all the values in the IEnumerable
and completes:
IEnumerable<char> e = new[] { 'a', 'b', 'c' }; IObservable<char> chars = e.ToObservable(); chars.Trace("chars"); // prints: chars -> a // chars -> b // chars -> c // chars END
You’ve now seen many, but not all, methods for creating IObservable
s. You may end up creating IObservable
s in other ways; for example, in GUI applications, you can turn events such as mouse clicks into event streams by using Observable.FromEvent
and FromEventPattern
.
Now that you know about creating and subscribing to IObservable
, let’s move on to the most fascinating area: transforming and combining different streams.
The power of using streams comes from the many ways in which you can combine them and define new streams based on existing ones. Rather than dealing with individual values in a stream (like in most event-driven designs), you deal with the stream as a whole.
Rx offers a lot of functions (often called operators) to transform and combine IObservable
s in a variety of ways. I’ll discuss the most commonly used ones and add a few operators of my own. You’ll recognize the typical traits of a functional API: purity and composability.
You can create new observables by transforming an existing observable in some way. One of the simplest operations is mapping. This is achieved with the Select
method, which works (as with any other container) by applying the given function to each element in the stream, as figure 18.2 shows.
Here’s some code that creates a timer and then maps a simple function on it:
var oneSec = TimeSpan.FromSeconds(1); var ticks = Observable.Interval(oneSec); ticks.Select(n => n * 10) .Trace("ticksX10");
We’re attaching an observer on the last line with the Trace
method, so the preceding code will cause the following messages to be printed every second:
Because Select
follows the LINQ query pattern, we can write the same thing using LINQ:
Using Select
, we can rewrite our simple program that checks exchange rates (first introduced in listing 15.1) in terms of observables:
public static void Main() { var inputs = new Subject<string>(); ❶ var rates = from pair in inputs ❷ select RatesApi.GetRateAsync(pair).Result; ❷ using (inputs.Trace("inputs")) ❸ using (rates.Trace("rates")) ❸ for (string input; (input = ReadLine().ToUpper()) != "Q";) inputs.OnNext(input); }
❶ The stream of values entered by the user
❷ Maps user inputs to the corresponding retrieved values
❸ Subscribes to both streams to produce debug messages
Here, inputs
represents the stream of currency pairs entered by the user, and in rates
, we map those pairs to the corresponding values retrieved from the web. We’re subscribing to both observables with the usual Trace
method, so an interaction with this program could be as follows:
Notice, however, that in the code, we have a blocking call to Result
. In a real application, we wouldn’t want to block a thread, so how could we avoid that?
We saw that a Task
can easily be promoted to an IObservable
. If we promote the Task
of retrieving each rate from the remote API to an IObservable
rather than waiting for its result, then we get an IObservable
of IObservable
s. Sound familiar? Bind
! We can use SelectMany
instead of Select
, which flattens the result into a single IObservable
. We can, therefore, rewrite the definition of the rates
stream as follows:
Observable.FromAsync
promotes the Task
returned by GetRateAsync
to an IObservable
, and SelectMany
flattens all these IObservable
s into a single IObservable
.
Because it’s always possible to promote a Task
to an IObservable
, an overload of SelectMany
exists that does just that (this is similar to how we overloaded Bind
to work with an IEnumerable
and an Option
-returning function in section 6.5). This means we can avoid explicitly calling FromAsync
and return a Task
instead. Furthermore, we can use a LINQ query:
The program thus modified works the same way as before but without the blocking call to Result
.
IObservable
also supports many of the other operations that are supported by IEnumerable
, such as filtering with Where
, Take
(takes the first n values), Skip
, First
, and so on.
There are also many operators that allow you to combine two streams into a single one. For example, Concat
produces all the values of one IObservable
, followed by all the values in another, as figure 18.3 shows.
For instance, in our exchange rate lookup, we have an observable called rates
with the retrieved rates. If we want an observable of all the messages the program should output to the console, this must include the retrieved rates but also an initial message prompting the user for some input. We can lift this single message into an IObservable
with Return
and then use Concat
to combine it with the other messages:
IObservable<decimal> rates = //... IObservable<string> outputs = Observable .Return("Enter a currency pair like 'EURUSD', or 'q' to quit") .Concat(rates.Select(Decimal.ToString));
In fact, the need to provide a starting value for an IObservable
is so common that there’s a dedicated function for it—StartWith
. The preceding code is equivalent to this:
var outputs = rates.Select(Decimal.ToString) .StartWith("Enter a currency pair like 'EURUSD', or 'q' to quit");
Whereas Concat
waits for the left IObservable
to complete before producing values from the right observable, Merge
combines values from two IObservables
without delay, as figure 18.4 shows.
For example, if you have a stream of valid values and one of error messages, you could combine them with Merge
as follows:
IObservable<decimal> rates = //... IObservable<string> errors = //... var outputs = rates.Select(Decimal.ToString) .Merge(errors);
Just as you might want to merge values from different streams, the opposite operation—partitioning a stream according to some criterion—is also often useful. Figure 18.5 illustrates this.
Partition
returns a pair of IObservable
s, so you can destructure it like this:
Partitioning an IObservable
of values is roughly equivalent to an if
when dealing with a single value, so it’s useful when you have a stream of values that you want to process differently, depending on some condition. For example, if you have a stream of messages and some criterion for validation, you can partition the stream into two streams of valid and invalid messages and process them accordingly.
Error handling when working with IObservable
works differently from what you might expect. In most programs, an uncaught exception either causes the whole application to crash or causes the processing of a single message/request to fail, while subsequent requests work fine. To illustrate how things work differently in Rx, consider this version of our program for looking up exchange rates:
var inputs = new Subject<string>(); var rates = from pair in inputs from rate in RatesApi.GetRateAsync(pair) select rate; var outputs = from r in rates select r.ToString(); using (inputs.Trace("inputs")) using (rates.Trace("rates")) using (outputs.Trace("outputs")) for (string input; (input = ReadLine().ToUpper()) != "Q";) inputs.OnNext(input);
The program captures three streams, each dependent on another (outputs
is defined in terms of rates
, and rates
is defined in terms of inputs
, as figure 18.6 shows), and we’re printing diagnostic messages for all of them with Trace
.
Now look what happens if you break the program by passing an invalid currency pair:
eurusd inputs -> EURUSD rates -> 1.0852 outputs -> 1.0852 chfusd inputs -> CHFUSD rates -> 1.0114 outputs -> 1.0114 xxx inputs -> XXX rates ERROR: Input string was not in a correct format. outputs ERROR: Input string was not in a correct format. chfusd inputs -> CHFUSD eurusd inputs -> EURUSD
What this shows is that once rates
errors, it never signals again. This behavior is as specified in the IObservable
contract (see the sidebar on “The IObservable
contract”). As a result, everything downstream is also “dead.” But IObservable
s upstream of the failed one are fine: inputs
is still signaling, as would any other IObservable
s defined in terms of inputs
.
To prevent your system from going into such a state, where a branch of the dataflow dies while the remaining graph keeps functioning, you can use the techniques you learned for functional error handling.
The following listing shows the implementation of Safely
, a helper function included in LaYumba.Functional
that allows you to safely apply a Task
-returning function to each element in a stream. The result is a pair of streams: a stream of successfully computed values and a stream of exceptions.
public static (IObservable<R> Completed, IObservable<Exception> Faulted) Safely<T, R>(this IObservable<T> ts, Func<T, Task<R>> f) => ts .SelectMany(t => f(t).Map( Faulted: ex => ex, ❶ Completed: r => Exceptional(r))) ❶ .Partition(); static (IObservable<T> Successes, IObservable<Exception> Exceptions) ❷ Partition<T>(this IObservable<Exceptional<T>> excTs) ❷ { bool IsSuccess(Exceptional<T> ex) => ex.Match(_ => false, _ => true); T ExtractValue(Exceptional<T> ex) => ex.Match(_ => default, t => t); Exception ExtractException(Exceptional<T> ex) => ex.Match(exc => exc, _ => default); var (ts, errs) = excTs.Partition(IsSuccess); return ( Successes: ts.Select(ExtractValue), Exceptions: errs.Select(ExtractException) ); }
❶ Converts each Task<R>
to a Task<Exceptional<R>>
to get a stream of Exceptional
s
❷ Partitions a stream of Exceptional
s into successfully computed values and exceptions
For each T
in the given stream, we apply the Task
-returning function f
. We then use the binary overload of Map
defined in section 16.1.4 to convert each resulting Task<R>
to a Task<Exceptional<R>>
. This is where we gain safety: instead of an inner value R
that throws an exception when it’s accessed, we have an Exceptional<R>
in the appropriate state. SelectMany
flattens away the Task
s in the stream and returns a stream of Exceptional
s. We can then partition this in successes and exceptions.
With this in place, we can refactor our program to handle errors more gracefully:
The following listing showcases the various techniques you’ve learned in this section. It shows the exchange rates lookup program, refactored to safely handle errors, and without the debug information.
public static void Main() { var inputs = new Subject<string>(); var (rates, errors) = inputs.Safely(RatesApi.GetRateAsync); var outputs = rates .Select(Decimal.ToString) .Merge(errors.Select(ex => ex.Message)) .StartWith("Enter a currency pair like 'EURUSD', or 'q' to quit"); using (outputs.Subscribe(WriteLine)) for (string input; (input = ReadLine().ToUpper()) != "Q";) inputs.OnNext(input); }
The dataflow diagram in figure 18.7 shows the various IObservable
s involved and how they depend on one another.
Notice how Safely
allows us to create two branches, each of which can be processed independently until a uniform representation for both cases is obtained, and they can be merged.
This program nicely illustrates the three parts that typically compose a program that uses IObservable
s:
Set up the data sources—In our case, this is captured by inputs
.
Process the data—This is where you use functions like Select
, Merge
, and so on.
Consume the results—Observers consume the most downstream IObservable
s (in this case, outputs
) to perform side effects.
So far I’ve mostly aimed at familiarizing you with IObservable
s and the many operators that can be used with them. For this, I’ve used familiar examples like the exchange rates lookup. After all, given that you can promote any value T
, Task<T>
, or IEnumerable<T>
to an IObservable<T>
, you could pretty much write all of your code in terms of IObservable
s! But should you?
The answer, of course, is probably not. The area in which IObservable
and Rx really shine is when you can use them to write stateful programs without any explicit state manipulation. By stateful programs, I mean programs in which events aren’t treated independently; past events influence how new events are treated. In this section, you’ll see a few such examples.
At some point, you’ve probably written an event handler that listens to a user’s keypresses and performs some actions based on what key and key modifiers were pressed. A callback-based approach is satisfactory for many cases, but what if you want to listen to a specific sequence of keypresses? For example, say you want to implement some behavior when the user presses the combination Alt-K-B.
In this case, pressing Alt-B should lead to different behavior, based on whether it was shortly preceded by the leading Alt-K, so keypresses can’t be treated independently. If you have a callback-based mechanism that deals with single keypressed events, you effectively need to set in motion a state machine when the user presses Alt-K, and then wait for the possible Alt-B that will follow, reverting to the previous state if no Alt-B is received in time. It’s actually pretty complicated!
With IObservable
, this can be solved much more elegantly. Let’s assume that we have a stream of keypress events, keys
. We’re looking for two events—Alt-K and Alt-B—that happen on that same stream in quick succession. In order to do this, we need to explore how to combine a stream with itself. Consider the following diagram:
It’s important to understand this diagram. The expression keys.Select(_ => keys)
yields a new IObservable
that maps each value produced by keys
to keys
itself. Therefore, when keys
produces its first value, “a,” this new IObservable
produces an IObservable
that has all following values in keys
. When keys
produces its second value, “b,” the new IObservable
produces another IObservable
that has all the values that follow “b,” and so on.3
Looking at the types can also help clarify this:
keys : IObservable<KeyInfo>
_ => keys : KeyInfo →
IObservable<KeyInfo>
keys.Select(_ => keys) : IObservable<IObservable<KeyInfo>>
If we use SelectMany
instead, all these values are flattened into a single stream:
Of course, if we’re looking for two consecutive keypresses, we don’t need all values that follow an item but just the next one. Instead of mapping each value to the whole IObservable
, let’s reduce it to the first item with Take
:
We’re getting close. Now, let’s make the following changes:
The resulting expression pairs each value in an IObservable
with the previously emitted value:
This is a pretty useful function in its own right, and I’ll call it PairWithPrevious
. We’ll use it later.
But for this particular scenario, we only want pairs to be created if they’re sufficiently close in time. This can be achieved easily by using an overload of Take
that takes a Timespan
as the following listing shows.
IObservable<ConsoleKeyInfo> keys = //... var halfSec = TimeSpan.FromMilliseconds(500); var keysAlt = keys .Where(key => key.Modifiers.HasFlag(ConsoleModifiers.Alt)); var twoKeyCombis = from first in keysAlt ❶ from second in keysAlt.Take(halfSec).Take(1) ❶ select (First: first, Second: second); ❶ var altKB = from pair in twoKeyCombis where pair.First.Key == ConsoleKey.K && pair.Second.Key == ConsoleKey.B select Unit();
❶ For any keypress, pairs it with the next keypress that occurs within a half-second
As you can see, the solution is simple and elegant. You can apply this approach to recognize more complex patterns within sequences of events—all without explicitly keeping track of state and introducing side effects!
You’ve probably also realized that coming up with such a solution isn’t necessarily easy. It takes a while to get familiar with IObservable
and its many operators, and to develop an understanding of how to use them.
Imagine we have a bank account denominated in Euros, and we’d like to keep track of its value in US Dollars. Both changes in balance and changes in the exchange rate cause the dollar balance to change. To react to changes from different streams, we could use CombineLatest
, which takes the latest values from two observables when one of them signals, as figure 18.8 shows.
Its usage would be as follows:
IObservable<decimal> balance = //... IObservable<decimal> eurUsdRate = //... var balanceInUsd = balance.CombineLatest(eurUsdRate , (bal, rate) => bal * rate);
This works, but it doesn’t take into account the fact that the exchange rate is much more volatile than the account balance. In fact, if exchange rates come from the FX market, there may well be dozens or hundreds of tiny movements every second! Surely this level of detail isn’t required for a private client who wants to keep an eye on their finances. Reacting to each change in exchange rate would flood the client with unwanted notifications.
This is an example of an IObservable
producing too much data (see the sidebar on “Backpressure”). For this, we can use Sample
, an operator that takes an IObservable
that acts as a data source, and another IObservable
that signals when values should be produced. Sample
is illustrated in figure 18.9.
In this scenario, we can create an IObservable
that signals at 10-minute intervals and use it to sample the stream of exchange rates, as the following listing shows.
IObservable<decimal> balance = //... IObservable<decimal> eurUsdRate = //... var tenMins = TimeSpan.FromMinutes(10); var sampler = Observable.Interval(tenMins); var eurUsdSampled = eurUsdRate.Sample(sampler); var balanceInUsd = balance.CombineLatest(eurUsdSampled , (bal, rate) => bal * rate);
This is another scenario in which our logic spans multiple events, and using Rx operators CombineLatest
and Sample
allows us to encode this logic without explicitly keeping any state.
For a final, more business-oriented example, imagine that in the context of the BOC application, we consume a stream of all transactions that affect bank accounts, and we want to send clients a notification if their account’s balance becomes negative.
An account’s balance is the sum of all the transactions that have affected it, so at any point, given a list of past Transaction
s for an account, you could compute its current balance using Aggregate
.
There is an Aggregate
function for IObservable
; it waits for an IObservable
to complete and aggregates all the values it produces into a single value. But this isn’t what we need: we don’t want to wait for the stream to complete, but to recompute the balance every time we receive a new Transaction
. For this, we can use Scan
(see figure 18.10), which is similar to Aggregate
but aggregates all previous values with every new value that is produced.
As a result, we can effectively use Scan
to keep state. Given an IObservable
of Transaction
s affecting a bank account, we can use Scan
to add up the amounts of all past transactions as they happen, obtaining an IObservable
that signals with the new balance when the account balance changes:
IObservable<Transaction> transactions = //... decimal initialBalance = 0; IObservable<decimal> balance = transactions.Scan(initialBalance , (bal, trans) => bal + trans.Amount);
Now that we have a stream of values representing an account’s current balance, we need to single out what changes in balance cause the account to “dip into the red,” going from positive to negative.
For this, we need to look at changes in the balance, and we can do this with PairWithPrevious
, which signals the current value together with the previously emitted value. You saw the implementation of PairWithPrevious
in section 18.4.1, but here it is again for reference:
// ----1-------2---------3--------4------> // // PairWithPrevious // // ------------(1,2)-----(2,3)----(3,4)--> // public static IObservable<(T Previous, T Current)> PairWithPrevious<T>(this IObservable<T> source) => from first in source from second in source.Take(1) select (Previous: first, Current: second);
This is one of many examples of custom operations that can be defined in terms of existing operations. The preceding snippet also shows how you can use ASCII marble diagrams to document your code.
We can use PairWithPrevious
to signal when an account dips into the red as follows:
IObservable<Unit> dipsIntoTheRed = from bal in balance.PairWithPrevious() where bal.Previous >= 0 && bal.Current < 0 select Unit();
Now let’s make things a bit closer to the real world. If your system receives a stream of transactions, this will probably include transactions for all accounts. Therefore, we must group them by account ID in order to correctly compute the balance. GroupBy
works for IObservable
similarly to how it does for IEnumerable
, but it returns a stream of streams.
The following listing shows how to adapt the logic, assuming an initial stream of transactions for all accounts.
IObservable<Transaction> transactions = //... ❶ IObservable<Guid> dipsIntoRed = transactions .GroupBy(t => t.AccountId) ❷ .Select(DipsIntoTheRed) ❸ .MergeAll(); ❹ static IObservable<Guid> DipsIntoTheRed (IGroupedObservable<Guid, Transaction> transactions) { Guid accountId = transactions.Key; decimal initialBalance = 0; var balance = transactions.Scan(initialBalance , (bal, trans) => bal + trans.Amount); return from bal in balance.PairWithPrevious() where bal.Previous >= 0 && bal.Current < 0 select accountId; ❺ } public static IObservable<T> MergeAll<T> (this IObservable<IObservable<T>> source) => source.SelectMany(x => x);
❶ Includes transactions from all accounts
❸ Signals dips into the red for any particular account
❹ Flattens the result into a single observable
❺ Signals the ID of the offending account
Now we’re starting with a stream of Transaction
s for all accounts, and we end up with a stream of Guid
s that will signal whenever an account dips into the red, including the Guid
identifying the offending account. Notice how this program is effectively keeping track of the balances of all accounts without the need for us to do any explicit state manipulation.
In this chapter, you’ve seen how you can use IObservable
to represent data streams and Rx to create and manipulate IObservable
s. There are many details and features of Rx that we haven’t discussed at all, but we’ve still covered enough ground for you to start using IObservable
s and to further explore the features of Rx as needed.4
As you’ve seen, having an abstraction that captures a data stream enables you to detect patterns and specify logic that spans across multiple events within the same stream or across different streams. This is where I’d recommend using IObservable
. The corollary is that, if your events can be handled independently, then you probably shouldn’t use IObservable
s because using them will probably reduce the readability of your code.
An important thing to keep in mind is that because OnNext
has no return value, an IObservable
can only push data downstream and never receives any data back. Hence, IObservable
s are best combined into one-directional dataflows. For instance, if you read events from a queue and write some data into a DB as a result, IObservable
can be a good fit; likewise if you have a server that communicates with web clients via WebSockets, where messages are exchanged between client and server in a fire-and-forget fashion.
On the other hand, IObservable
s are not well-suited to a request-response model such as HTTP. You could model the received requests as a stream and compute a stream of responses, but you’d then have no easy way to tie these responses back to the original requests.
Finally, if you have complex synchronization patterns that can’t be captured with the operators in Rx, and you need more fine-grained control over how messages are sequenced and processed, you may find the building blocks in the System.DataFlow
namespace (based on in-memory queues) more appropriate.
IObservable<T>
represents a stream of T
s, a sequence of values in time.
Remove an observer by disposing of the subscription returned by Subscribe
.
Separate side effects (in observers) from logic (in stream transformations).
When deciding on whether to use IObservable
, consider the following:
1 IObserver
is the method declared in the IObservable
interface. The overload that takes the callbacks is an extension method.
2 Rx includes several libraries. The main library, System.Reactive
, bundles the packages you’ll most commonly need: System.Reactive.Interfaces
, System.Reactive.Core
, System.Reactive.Linq
, and System.Reactive.PlatformServices
. There are several other packages that are useful in more specialized scenarios, such as if you’re using Windows forms.
3 Imagine what keys.Select(_
=>
keys)
would look like if keys
were an IEnumerable
: for each value, you’d be taking the whole IEnumerable
. In the end, you’d have an IEnumerable
containing n replicas of keys
(n being the length of keys
). With IObservable
, the behavior is different because of the element of time, so when you say, “Give me keys
,” what you really get is all values keys
will produce in the future.
4 To give you an idea of what was not covered, there are many more operators along with important implementation details of Rx: schedulers (which determine how calls to observers are dispatched), hot versus cold observables (not all observables are lazy), and Subject
s with different behaviors, for example.
13.59.193.151