List of Figures
Chapter 1. Reactive programming
Figure 1.1. A reactive representation of the function c = a + b. As the values of a and b are changing, c’s value is changing
as well. When a is 7 and b is 2, c automatically changes to 9. When b changes to 1, c becomes 8 because a’s value is still
7.
Figure 1.2. The Shoppy application architecture. The mobile app receives the current location from the GPS and can query about
shops and deals via the application service. When a new deal is available, the application service sends a push notification
through the push notifications server.
Figure 1.3. The Shoppy application view of the map. When the user is far from the Rx shop, the icon is small (on the left),
and as the user gets closer, the icon gets bigger (on the right).
Figure 1.4. The Rx layers. In the middle are the key interfaces that represent event streams and on the bottom are the schedulers
that control the concurrency of the stream processing. Above all is the powerful operators library that enables you to create
an event-processing pipeline in LINQ style.
Figure 1.5. The Rx electric eel logo, inspired from the Volta project
Figure 1.6. A sequence diagram of the happy path of the observable and observer flow of interaction. In this scenario, an
observer is subscribed to the observable by the application; the observable “pushes” three messages to the observers (only
one in this case), and then notifies the observers that it has completed.
Figure 1.7. In the case of an error in the observable, the observers will be notified through the OnError method with the
exception object of the failure.
Figure 1.8. The composable nature of Rx operators allows you to encapsulate what happens to the notification since it was
emitted from the original source.
Figure 1.9. Marble diagram with two observable sequences
Figure 1.10. An observable can end because it has completed or because an error occurred.
Figure 1.11. Marble diagram that shows the output of various operators on the observable
Figure 1.12. The relationship of a message-driven approach to load leveling and elasticity. On the left, messages are arriving
at a high frequency, but system processing is leveled to a constant rate, and the queue is filling faster than it’s emptied.
On the right, even if the processing worker role has crashed, users can still fill the queue; and when the system recovers
and adds a new worker, the processing continues.
Figure 1.13. The relationships between the Reactive Manifesto core concepts. Rx is positioned inside the messagedriven concept,
because Rx provides abstractions to handle messages as they enter the application.
Figure 1.14. Synchronous food order in which every step must be completed before going to the next one
Figure 1.15. Data in motion and data at rest as one data stream. The connection points from the outside environment are a
perfect fit for creating observables. Those observables can be merged easily with Rx to create a merged observable that the
inner module can subscribe to without knowing the exact source of a data element.
Figure 1.16. A data stream is like a hose: every drop of water is a data packet that needs to go through stations until it
reaches the end. Your data also needs to be filtered and transformed until it gets to the real handler that does something
useful with it.
Chapter 2. Hello, Rx
Figure 2.1. Flowchart of the Stock R Us application logic. We notify the user of drastic change—a change of more than 10%
in price.
Figure 2.2. Multiple threads executing the eventhandler code at the same time. Each box represents the execution time of a
stock. While the first thread is running the code for MSFT, the second thread starts executing for the GOOG stock. Then the
third thread starts for the same stock symbol as the first thread.
Figure 2.3. A deadlock: Thread 1 is holding the resource R1 and waiting for the resource R2 to be available. At the same time,
Thread 2 is holding resource R2 and waiting for resource R1. Both threads will remain locked forever if no external intervention
occurs.
Figure 2.4. Rx assemblies are a set of portable class libraries (middle and bottom) and platform-specific libraries (top left).
The PlatformServices assembly holds the platform enlightments that are the glue between the two.
Figure 2.5. Reactive Extensions NuGet packages. Many packages add things on top of Rx to identify the Rx.NET-specific libraries.
Look for a package ID with the prefix System.Reactive and make sure the publisher is Microsoft.
Figure 2.6. Installing the Rx libraries through the Package Manager console. Make sure you select the correct project for
installation from the Default Project drop-down list. You can also define the project by typing -ProjectName [project name].
Figure 2.7. NuGet Package Manager from VS 2015. Search for the package you want by typing its name
Figure 2.8. FromEventPattern method signature
Figure 2.9. A simple grouping of the stock ticks by the quote symbol
Figure 2.10. The ticks observable is grouped into two company groups, each one for a different symbol. As the notifications
are pushed on the ticks observable, they’re routed to their group observable. If it’s the first time the symbol appears, a
new observable is created for the group.
Figure 2.11. Ticks are batched together. Each batch has two items; two consecutive batches have a shared item.
Figure 2.12. After applying the Buffer(...) method on each group, you a get new type of notification that holds an array of
the two consecutive ticks.
Figure 2.13. From each pair of consecutive ticks per company group, you calculate the ratio of difference.
Figure 2.14. After filtering the notifications with the Where operator, you find that only one notification is a drastic change.
Chapter 3. Functional thinking in C#
Figure 3.1. A simple web page that has a title, a heading, and a paragraph
Figure 3.2. In functional programming languages, functions can be passed as arguments. This is the expression tree of the
call to applyAndAdd f x y with f: square, x: 5, y: 3.
Figure 3.3. The key benefit of functional programming is that it makes you more productive. The key elements for productivity
are illustrated here.
Figure 3.4. Declaration of a delegate type for methods that receive two strings and return an integer
Figure 3.5. The Strategy pattern class diagram. The context’s operation can be extended by providing different implementations
of the strategy.
Figure 3.6. LINQ architecture: for each type of data source, a LINQ provider translates the LINQ query to a query language
that best fits the source.
Figure 3.7. Composability of LINQ queries. LINQ is structured as a set of pipes and filters. Conceptually, the output of each
operator becomes the input of the next one until you reach the end result.
Figure 3.8. Anonymous type with two properties
Figure 3.9. Using var on an anonymous type. The compiler and IntelliSense know how to deduce the real type generated.
Chapter 4. Creating observable sequences
Figure 4.1. An example of a possible observable-observer dialogue. The observer receives notifications after subscribing until
the network disconnects, which leads to an error.
Figure 4.2. Mapping the ChatConnection events to the observer methods
Figure 4.3. The Defer method signature
Figure 4.4. Sequence diagram of the subscription of an observer to a deferred observable created with the Defer operator.
Figure 4.5. One of the FromEventPattern method overload’s signatures
Chapter 5. Creating observables from .NET asynchronous types
Figure 5.1. From the observer standpoint, the observable can run on any thread and emit the notifications by computing them
asynchronously or synchronously.
Figure 5.2. The asynchronous version of the Create operator
Figure 5.3. With observables, observers can start to receive notifications even if not all the sequence sources (like search
engines) have completed.
Figure 5.4. The Concat operator marble diagram. All items from the first observable are emitted. Only after the first observable
completes will the items from the second observable be emitted.
Figure 5.5. The SelectMany operator marble diagram. Each item produces an enumerable by the selector, and the items from each
enumerable are emitted to the resulting observable.
Figure 5.6. Asynchronicity can also be necessary in one of the pipeline operators. Operator 2 performs an async operation
that, once completed, is passed to operator 3.
Figure 5.7. The SelectMany operator allows you to generate an asynchronous task from each element and then emit the task results
on the resulted observable.
Figure 5.8. A marble diagram that shows the use of the SelectMany operator with asynchronous code. Each number is checked
(asynchronously) to see whether it’s prime. When the result is ready, the observable pipeline continues.
Figure 5.9. Visual Studio IntelliSense names the anonymous type in the selector function 'a.
Figure 5.10. The Interval operator in this marble diagram creates an observable that emits a value every time interval (every
1 second in the diagram).
Figure 5.11. The Timer operator marble diagram creates an observable sequence that periodically produces a value (1 second
in the diagram) after the specified initial relative due time has elapsed (2 seconds in the diagram).
Chapter 6. Controlling the observer-observable relationship
Figure 6.1. An observable can have multiple observers, and an observer can observe multiple observables.
Figure 6.2. The communication protocol between the observable and the observer
Figure 6.3. Creating the observer and subscribing it as part of the pipeline with the Subscribe operator
Figure 6.4. Rx monitors the CancellationToken for cancellation. When this happens, it will dispose of the inner subscription.
Figure 6.5. Subscribing the same observer to multiple observables lets you share and reuse the subscriber’s functionality.
Figure 6.6. Given an observable, the beginning of the emissions observed by the observers as well as the end of the emissions
are configurable and create an observation box.
Figure 6.7. The TakeUntil operator allows notifications from the observable source to proceed until the other observable emits
a notification.
Figure 6.8. The SkipUntil operator lets you skip notifications from the observable source until the other observable emits
a notification.
Figure 6.9. The TakeWhile operator accepts notifications while a predicate function evaluates to true and discards all items
after the predicate evaluates to false.
Figure 6.10. The SkipWhile operator discards the notifications as long as a predicate evaluates to true and accepts all notifications
after the predicate evaluates to false.
Figure 6.11. Adding a side effect between operators
Figure 6.12. The Reactive Draw application. A line is created by adding points based on the position of the mouse, starting
from the point the mouse button is down and stopping when it’s up.
Chapter 7. Controlling the observable temperature
Figure 7.1. A subject is a type that’s both an observable and an observer. It allows multicasting the notifications emitted
by the sources to the observers.
Figure 7.2. Subject<T> is a broadcaster. Each notification it observes is broadcast to its observers.
Figure 7.3. The subject can subscribe to multiple sources, but when any of the sources completes (the second in this figure),
so does the subject.
Figure 7.4. AsyncSubject emits only the last value to current and future observers.
Figure 7.5. BehaviorSubject represents a value that changes over time. Observers receive the last (or initial) value and all
subsequent notifications.
Figure 7.6. ReplaySubject broadcasts each notification to all subscribed and future observers, subject to buffer trimming
policies.
Figure 7.7. Instead of exposing your subject, use the AsObservable operator to create a proxy that hides the inner subject.
Figure 7.8. A cold observable is passive and starts emitting only when an observer subscribes. A hot observable is active,
and its emissions are shared among all the observers.
Figure 7.9. Even though each observer subscribes to the same observable, each observer receives a different sequence and the
operator processes different elements.
Figure 7.10. The steps for turning a cold observable into a hot observable. The order of the steps is important! After connecting
the subject to the cold observable, data starts flowing and it is sent only once.
Figure 7.11. Publishing an observable with an initial value. Observers receive either the last value that was emitted from
the source observable or the initial value, if no notification was yet emitted.
Figure 7.12. Turning a hot observable to a cold observable is necessary when you want to capture emissions and replay them.
Figure 7.13. Marble diagram showing the result of the
Chapter 8. Working with basic query operators
Figure 8.1. Example of an observable pipeline. Each block may or may not be present, and the order of blocks may change as
well. Sometimes one type of block may be present more than once.
Figure 8.2. The Select operator projects each element of an observable sequence into a new form.
Figure 8.3. Using the Select operator to convert an incoming message DTO to a corresponding ViewModel with a loaded user object
from the database
Figure 8.4. The SelectMany operator flattens an observable of collections to an observable of items.
Figure 8.5. The test news items. The first news item contains two images, but only one that is child friendly, and the second
news item contains a single image.
Figure 8.6. The SelectMany operator flattens an observable of observables to an observable of emitted items from all the observables.
Figure 8.7. Flattening messages from various chat rooms to one stream of messages
Figure 8.8. When the SelectMany operator is applied to an observable of observables, the resultSelector will be invoked for
each notification, together with the source item that created the observable it was emitted from.
Figure 8.9. The Where operator takes a predicate function and filters the elements of an observable sequence.
Figure 8.10. The Distinct operator suppresses duplicate items emitted by an observable.
Figure 8.11. To reduce load on the service, avoid sending the same term more than once contiguously.
Figure 8.12. The DistinctUntilChanged operator filters consecutive duplicate items from the observable.
Figure 8.13. With DistinctUntilChanged, the word Reactive appears only once, even though it was provided twice.
Figure 8.14. The Sum operator calculates the sum of numbers emitted by an observable and then emits the sum.
Figure 8.15. The Count operator counts the number of items emitted by the source observable and emits this value.
Figure 8.16. The Average operator calculates the average of numbers emitted by an observable and emits this average.
Figure 8.17. The Max operator emits the maximum value in an observable sequence.
Figure 8.18. The MaxBy operator, based on the values provided by the keySelector function, emits the maximum value as an item
when the source observable completes.
Figure 8.19. The Aggregate operator applies a function to each item emitted by an observable, and then emits the computed
value upon the source observable completion.
Figure 8.20. The Scan operator applies a function to each item emitted by sequential observables and emits each successive
value.
Chapter 9. Partitioning and combining observables
Figure 9.1. The Zip operator lets you zip values with the same index from two (or more) observables by using a selector function.
Figure 9.2. The CombineLatest operator combines the latest emitted values from each observable by using a selector function.
Figure 9.3. The Concat operator concatenates the second observable sequence to the first observable sequence upon successful
termination of the first.
Figure 9.4. The Merge operator merges the notifications from the source observables into a single observable sequence.
Figure 9.5. The Switch operator takes an observable that emits observables and creates a single observable that emits the
notifications from the most recent observable.
Figure 9.6. The GroupBy operator groups the elements of an observable sequence according to a specified key selector function
(for example, splitting a stream of people into a group of males and a group of females). Each group is an observable of the
group elements.
Figure 9.7. The Join operator combines items emitted by two observables when an item from one observable is emitted during
a time frame of an emitted item from the other observable.
Figure 9.8. The GroupJoin operator correlates elements from two observables based on overlapping duration time frames. The
elements from the second observable are grouped by the element from the first observable to which they correlate.
Figure 9.9. Buffering versus windowing: with windowing, you get the emissions as soon as they arrive, and with buffering you
get the buffer only when it closes.
Figure 9.10. A marble diagram of acceleration calculated with the Buffer operator
Figure 9.11. Buffering with various combinations of amount and skip and the effect on the windows’ behavior
Figure 9.12. The Window operator splits the observable sequence into subobservables based on temporal boundaries or capacity.
Figure 9.13. Fixed windows versus sliding windows
Chapter 10. Working with Rx concurrency and synchronization
Figure 10.1. The Rx schedulers are like timers: you assign specific actions or tasks to the scheduler, and when a preset time
expires, the scheduler posts the work to the execution context it’s bound to.
Figure 10.2. Scheduling work with NewThreadScheduler
Figure 10.3. You can use schedulers to create a periodic behavior. You can also use the state parameter for passing information
to the next iteration.
Figure 10.4. The Timestamp operator adds a timestamp of the emission time to every notification.
Figure 10.5. The TimeInterval operator computes the time interval between two notifications.
Figure 10.6. The Timeout operator emits an error notification when the time-out duration has passed without emitting.
Figure 10.7. The Delay operator shifts the observable notifications by a time duration.
Figure 10.8. The Throttle operator emits an item from an observable only if a particular time span has passed without emitting
another item.
Figure 10.9. The Sample operator samples the observable sequence at each interval, emitting the last notification in the interval.
Figure 10.10. The ObserveOn operator runs the observer functions on the specified scheduler.
Figure 10.11. The SubscribeOn operator runs the observer subscription and unsubscription on the specified scheduler.
Figure 10.12. The effects of SubscribeOn and ObserveOn on the observable pipeline
Chapter 11. Error handling and recovery
Figure 11.1. When an exception is thrown in the pipeline by the observable or one of the operators, it’s propagated to the
OnError function of the observer.
Figure 11.2. The Catch operator lets you handle a specific exception type and set a fallback observable in case an exception
is thrown.
Figure 11.3. The OnErrorResumeNext operator is a hybrid of the Catch operator and the Concat operator.
Figure 11.4. The Retry operator resubscribes the observer to the observable when an error is emitted. In the case of a hot
observable, as shown in the figure, the observer receives the rest of the emitted notifications.
Figure 11.5. The Retry operator automatically resubscribes to the weather station observable in the case of an error.
Figure 11.6. The Using operator creates a disposable resource that has the same lifespan as the observable.
Figure 11.7. The Finally operator registers an action to take on the observable or subscription termination.
Figure 11.8. When an observer is subscribed to an observable, it remains alive, regardless of its creator.
Figure 11.9. Disconnecting the observable and its observer with a mediator observer that weakly references the real observer
Figure 11.10. The effect of different rates between an observable and an observer
Figure 11.11. A Service Unavailable error page that you might get when the website is overloaded
Figure 11.12. Zipping a fast observable with a slow observable leads to pending elements stored in memory.
Appendix A. Writing asynchronous code in .NET
Figure A.1. Two approaches to get the content of a document from a friend. The left sequence shows the synchronous way via
phone call. The right sequence shows the asynchronous way via email.
Figure A.2. Creating a background thread. After the thread is created, the main thread continues its execution concurrently
to the background thread.
Figure A.3. The task is a .NET implementation of a future: a stand-in for a computational result that’s initially unknown
but becomes available at a later time.
Appendix C. Testing Rx queries and operators
Figure C.1. The results of the xUnit data-driven test, as shown in Visual Studio Test Explorer
Figure C.2. Visualization of the expected result of applying the FilterBursts operator on an observable that emits two bursts
with a time gap between them
Figure C.3. The FilterBursts operator emits the first value from every burst of items