List of Figures

Chapter 1. Reactive programming

Figure 1.1. A reactive representation of the function c = a + b. As the values of a and b are changing, c’s value is changing as well. When a is 7 and b is 2, c automatically changes to 9. When b changes to 1, c becomes 8 because a’s value is still 7.

Figure 1.2. The Shoppy application architecture. The mobile app receives the current location from the GPS and can query about shops and deals via the application service. When a new deal is available, the application service sends a push notification through the push notifications server.

Figure 1.3. The Shoppy application view of the map. When the user is far from the Rx shop, the icon is small (on the left), and as the user gets closer, the icon gets bigger (on the right).

Figure 1.4. The Rx layers. In the middle are the key interfaces that represent event streams and on the bottom are the schedulers that control the concurrency of the stream processing. Above all is the powerful operators library that enables you to create an event-processing pipeline in LINQ style.

Figure 1.5. The Rx electric eel logo, inspired from the Volta project

Figure 1.6. A sequence diagram of the happy path of the observable and observer flow of interaction. In this scenario, an observer is subscribed to the observable by the application; the observable “pushes” three messages to the observers (only one in this case), and then notifies the observers that it has completed.

Figure 1.7. In the case of an error in the observable, the observers will be notified through the OnError method with the exception object of the failure.

Figure 1.8. The composable nature of Rx operators allows you to encapsulate what happens to the notification since it was emitted from the original source.

Figure 1.9. Marble diagram with two observable sequences

Figure 1.10. An observable can end because it has completed or because an error occurred.

Figure 1.11. Marble diagram that shows the output of various operators on the observable

Figure 1.12. The relationship of a message-driven approach to load leveling and elasticity. On the left, messages are arriving at a high frequency, but system processing is leveled to a constant rate, and the queue is filling faster than it’s emptied. On the right, even if the processing worker role has crashed, users can still fill the queue; and when the system recovers and adds a new worker, the processing continues.

Figure 1.13. The relationships between the Reactive Manifesto core concepts. Rx is positioned inside the messagedriven concept, because Rx provides abstractions to handle messages as they enter the application.

Figure 1.14. Synchronous food order in which every step must be completed before going to the next one

Figure 1.15. Data in motion and data at rest as one data stream. The connection points from the outside environment are a perfect fit for creating observables. Those observables can be merged easily with Rx to create a merged observable that the inner module can subscribe to without knowing the exact source of a data element.

Figure 1.16. A data stream is like a hose: every drop of water is a data packet that needs to go through stations until it reaches the end. Your data also needs to be filtered and transformed until it gets to the real handler that does something useful with it.

Chapter 2. Hello, Rx

Figure 2.1. Flowchart of the Stock R Us application logic. We notify the user of drastic change—a change of more than 10% in price.

Figure 2.2. Multiple threads executing the eventhandler code at the same time. Each box represents the execution time of a stock. While the first thread is running the code for MSFT, the second thread starts executing for the GOOG stock. Then the third thread starts for the same stock symbol as the first thread.

Figure 2.3. A deadlock: Thread 1 is holding the resource R1 and waiting for the resource R2 to be available. At the same time, Thread 2 is holding resource R2 and waiting for resource R1. Both threads will remain locked forever if no external intervention occurs.

Figure 2.4. Rx assemblies are a set of portable class libraries (middle and bottom) and platform-specific libraries (top left). The PlatformServices assembly holds the platform enlightments that are the glue between the two.

Figure 2.5. Reactive Extensions NuGet packages. Many packages add things on top of Rx to identify the Rx.NET-specific libraries. Look for a package ID with the prefix System.Reactive and make sure the publisher is Microsoft.

Figure 2.6. Installing the Rx libraries through the Package Manager console. Make sure you select the correct project for installation from the Default Project drop-down list. You can also define the project by typing -ProjectName [project name].

Figure 2.7. NuGet Package Manager from VS 2015. Search for the package you want by typing its name

Figure 2.8. FromEventPattern method signature

Figure 2.9. A simple grouping of the stock ticks by the quote symbol

Figure 2.10. The ticks observable is grouped into two company groups, each one for a different symbol. As the notifications are pushed on the ticks observable, they’re routed to their group observable. If it’s the first time the symbol appears, a new observable is created for the group.

Figure 2.11. Ticks are batched together. Each batch has two items; two consecutive batches have a shared item.

Figure 2.12. After applying the Buffer(...) method on each group, you a get new type of notification that holds an array of the two consecutive ticks.

Figure 2.13. From each pair of consecutive ticks per company group, you calculate the ratio of difference.

Figure 2.14. After filtering the notifications with the Where operator, you find that only one notification is a drastic change.

Chapter 3. Functional thinking in C#

Figure 3.1. A simple web page that has a title, a heading, and a paragraph

Figure 3.2. In functional programming languages, functions can be passed as arguments. This is the expression tree of the call to applyAndAdd f x y with f: square, x: 5, y: 3.

Figure 3.3. The key benefit of functional programming is that it makes you more productive. The key elements for productivity are illustrated here.

Figure 3.4. Declaration of a delegate type for methods that receive two strings and return an integer

Figure 3.5. The Strategy pattern class diagram. The context’s operation can be extended by providing different implementations of the strategy.

Figure 3.6. LINQ architecture: for each type of data source, a LINQ provider translates the LINQ query to a query language that best fits the source.

Figure 3.7. Composability of LINQ queries. LINQ is structured as a set of pipes and filters. Conceptually, the output of each operator becomes the input of the next one until you reach the end result.

Figure 3.8. Anonymous type with two properties

Figure 3.9. Using var on an anonymous type. The compiler and IntelliSense know how to deduce the real type generated.

Chapter 4. Creating observable sequences

Figure 4.1. An example of a possible observable-observer dialogue. The observer receives notifications after subscribing until the network disconnects, which leads to an error.

Figure 4.2. Mapping the ChatConnection events to the observer methods

Figure 4.3. The Defer method signature

Figure 4.4. Sequence diagram of the subscription of an observer to a deferred observable created with the Defer operator.

Figure 4.5. One of the FromEventPattern method overload’s signatures

Chapter 5. Creating observables from .NET asynchronous types

Figure 5.1. From the observer standpoint, the observable can run on any thread and emit the notifications by computing them asynchronously or synchronously.

Figure 5.2. The asynchronous version of the Create operator

Figure 5.3. With observables, observers can start to receive notifications even if not all the sequence sources (like search engines) have completed.

Figure 5.4. The Concat operator marble diagram. All items from the first observable are emitted. Only after the first observable completes will the items from the second observable be emitted.

Figure 5.5. The SelectMany operator marble diagram. Each item produces an enumerable by the selector, and the items from each enumerable are emitted to the resulting observable.

Figure 5.6. Asynchronicity can also be necessary in one of the pipeline operators. Operator 2 performs an async operation that, once completed, is passed to operator 3.

Figure 5.7. The SelectMany operator allows you to generate an asynchronous task from each element and then emit the task results on the resulted observable.

Figure 5.8. A marble diagram that shows the use of the SelectMany operator with asynchronous code. Each number is checked (asynchronously) to see whether it’s prime. When the result is ready, the observable pipeline continues.

Figure 5.9. Visual Studio IntelliSense names the anonymous type in the selector function 'a.

Figure 5.10. The Interval operator in this marble diagram creates an observable that emits a value every time interval (every 1 second in the diagram).

Figure 5.11. The Timer operator marble diagram creates an observable sequence that periodically produces a value (1 second in the diagram) after the specified initial relative due time has elapsed (2 seconds in the diagram).

Chapter 6. Controlling the observer-observable relationship

Figure 6.1. An observable can have multiple observers, and an observer can observe multiple observables.

Figure 6.2. The communication protocol between the observable and the observer

Figure 6.3. Creating the observer and subscribing it as part of the pipeline with the Subscribe operator

Figure 6.4. Rx monitors the CancellationToken for cancellation. When this happens, it will dispose of the inner subscription.

Figure 6.5. Subscribing the same observer to multiple observables lets you share and reuse the subscriber’s functionality.

Figure 6.6. Given an observable, the beginning of the emissions observed by the observers as well as the end of the emissions are configurable and create an observation box.

Figure 6.7. The TakeUntil operator allows notifications from the observable source to proceed until the other observable emits a notification.

Figure 6.8. The SkipUntil operator lets you skip notifications from the observable source until the other observable emits a notification.

Figure 6.9. The TakeWhile operator accepts notifications while a predicate function evaluates to true and discards all items after the predicate evaluates to false.

Figure 6.10. The SkipWhile operator discards the notifications as long as a predicate evaluates to true and accepts all notifications after the predicate evaluates to false.

Figure 6.11. Adding a side effect between operators

Figure 6.12. The Reactive Draw application. A line is created by adding points based on the position of the mouse, starting from the point the mouse button is down and stopping when it’s up.

Chapter 7. Controlling the observable temperature

Figure 7.1. A subject is a type that’s both an observable and an observer. It allows multicasting the notifications emitted by the sources to the observers.

Figure 7.2. Subject<T> is a broadcaster. Each notification it observes is broadcast to its observers.

Figure 7.3. The subject can subscribe to multiple sources, but when any of the sources completes (the second in this figure), so does the subject.

Figure 7.4. AsyncSubject emits only the last value to current and future observers.

Figure 7.5. BehaviorSubject represents a value that changes over time. Observers receive the last (or initial) value and all subsequent notifications.

Figure 7.6. ReplaySubject broadcasts each notification to all subscribed and future observers, subject to buffer trimming policies.

Figure 7.7. Instead of exposing your subject, use the AsObservable operator to create a proxy that hides the inner subject.

Figure 7.8. A cold observable is passive and starts emitting only when an observer subscribes. A hot observable is active, and its emissions are shared among all the observers.

Figure 7.9. Even though each observer subscribes to the same observable, each observer receives a different sequence and the operator processes different elements.

Figure 7.10. The steps for turning a cold observable into a hot observable. The order of the steps is important! After connecting the subject to the cold observable, data starts flowing and it is sent only once.

Figure 7.11. Publishing an observable with an initial value. Observers receive either the last value that was emitted from the source observable or the initial value, if no notification was yet emitted.

Figure 7.12. Turning a hot observable to a cold observable is necessary when you want to capture emissions and replay them.

Figure 7.13. Marble diagram showing the result of the

Chapter 8. Working with basic query operators

Figure 8.1. Example of an observable pipeline. Each block may or may not be present, and the order of blocks may change as well. Sometimes one type of block may be present more than once.

Figure 8.2. The Select operator projects each element of an observable sequence into a new form.

Figure 8.3. Using the Select operator to convert an incoming message DTO to a corresponding ViewModel with a loaded user object from the database

Figure 8.4. The SelectMany operator flattens an observable of collections to an observable of items.

Figure 8.5. The test news items. The first news item contains two images, but only one that is child friendly, and the second news item contains a single image.

Figure 8.6. The SelectMany operator flattens an observable of observables to an observable of emitted items from all the observables.

Figure 8.7. Flattening messages from various chat rooms to one stream of messages

Figure 8.8. When the SelectMany operator is applied to an observable of observables, the resultSelector will be invoked for each notification, together with the source item that created the observable it was emitted from.

Figure 8.9. The Where operator takes a predicate function and filters the elements of an observable sequence.

Figure 8.10. The Distinct operator suppresses duplicate items emitted by an observable.

Figure 8.11. To reduce load on the service, avoid sending the same term more than once contiguously.

Figure 8.12. The DistinctUntilChanged operator filters consecutive duplicate items from the observable.

Figure 8.13. With DistinctUntilChanged, the word Reactive appears only once, even though it was provided twice.

Figure 8.14. The Sum operator calculates the sum of numbers emitted by an observable and then emits the sum.

Figure 8.15. The Count operator counts the number of items emitted by the source observable and emits this value.

Figure 8.16. The Average operator calculates the average of numbers emitted by an observable and emits this average.

Figure 8.17. The Max operator emits the maximum value in an observable sequence.

Figure 8.18. The MaxBy operator, based on the values provided by the keySelector function, emits the maximum value as an item when the source observable completes.

Figure 8.19. The Aggregate operator applies a function to each item emitted by an observable, and then emits the computed value upon the source observable completion.

Figure 8.20. The Scan operator applies a function to each item emitted by sequential observables and emits each successive value.

Chapter 9. Partitioning and combining observables

Figure 9.1. The Zip operator lets you zip values with the same index from two (or more) observables by using a selector function.

Figure 9.2. The CombineLatest operator combines the latest emitted values from each observable by using a selector function.

Figure 9.3. The Concat operator concatenates the second observable sequence to the first observable sequence upon successful termination of the first.

Figure 9.4. The Merge operator merges the notifications from the source observables into a single observable sequence.

Figure 9.5. The Switch operator takes an observable that emits observables and creates a single observable that emits the notifications from the most recent observable.

Figure 9.6. The GroupBy operator groups the elements of an observable sequence according to a specified key selector function (for example, splitting a stream of people into a group of males and a group of females). Each group is an observable of the group elements.

Figure 9.7. The Join operator combines items emitted by two observables when an item from one observable is emitted during a time frame of an emitted item from the other observable.

Figure 9.8. The GroupJoin operator correlates elements from two observables based on overlapping duration time frames. The elements from the second observable are grouped by the element from the first observable to which they correlate.

Figure 9.9. Buffering versus windowing: with windowing, you get the emissions as soon as they arrive, and with buffering you get the buffer only when it closes.

Figure 9.10. A marble diagram of acceleration calculated with the Buffer operator

Figure 9.11. Buffering with various combinations of amount and skip and the effect on the windows’ behavior

Figure 9.12. The Window operator splits the observable sequence into subobservables based on temporal boundaries or capacity.

Figure 9.13. Fixed windows versus sliding windows

Chapter 10. Working with Rx concurrency and synchronization

Figure 10.1. The Rx schedulers are like timers: you assign specific actions or tasks to the scheduler, and when a preset time expires, the scheduler posts the work to the execution context it’s bound to.

Figure 10.2. Scheduling work with NewThreadScheduler

Figure 10.3. You can use schedulers to create a periodic behavior. You can also use the state parameter for passing information to the next iteration.

Figure 10.4. The Timestamp operator adds a timestamp of the emission time to every notification.

Figure 10.5. The TimeInterval operator computes the time interval between two notifications.

Figure 10.6. The Timeout operator emits an error notification when the time-out duration has passed without emitting.

Figure 10.7. The Delay operator shifts the observable notifications by a time duration.

Figure 10.8. The Throttle operator emits an item from an observable only if a particular time span has passed without emitting another item.

Figure 10.9. The Sample operator samples the observable sequence at each interval, emitting the last notification in the interval.

Figure 10.10. The ObserveOn operator runs the observer functions on the specified scheduler.

Figure 10.11. The SubscribeOn operator runs the observer subscription and unsubscription on the specified scheduler.

Figure 10.12. The effects of SubscribeOn and ObserveOn on the observable pipeline

Chapter 11. Error handling and recovery

Figure 11.1. When an exception is thrown in the pipeline by the observable or one of the operators, it’s propagated to the OnError function of the observer.

Figure 11.2. The Catch operator lets you handle a specific exception type and set a fallback observable in case an exception is thrown.

Figure 11.3. The OnErrorResumeNext operator is a hybrid of the Catch operator and the Concat operator.

Figure 11.4. The Retry operator resubscribes the observer to the observable when an error is emitted. In the case of a hot observable, as shown in the figure, the observer receives the rest of the emitted notifications.

Figure 11.5. The Retry operator automatically resubscribes to the weather station observable in the case of an error.

Figure 11.6. The Using operator creates a disposable resource that has the same lifespan as the observable.

Figure 11.7. The Finally operator registers an action to take on the observable or subscription termination.

Figure 11.8. When an observer is subscribed to an observable, it remains alive, regardless of its creator.

Figure 11.9. Disconnecting the observable and its observer with a mediator observer that weakly references the real observer

Figure 11.10. The effect of different rates between an observable and an observer

Figure 11.11. A Service Unavailable error page that you might get when the website is overloaded

Figure 11.12. Zipping a fast observable with a slow observable leads to pending elements stored in memory.

Appendix A. Writing asynchronous code in .NET

Figure A.1. Two approaches to get the content of a document from a friend. The left sequence shows the synchronous way via phone call. The right sequence shows the asynchronous way via email.

Figure A.2. Creating a background thread. After the thread is created, the main thread continues its execution concurrently to the background thread.

Figure A.3. The task is a .NET implementation of a future: a stand-in for a computational result that’s initially unknown but becomes available at a later time.

Appendix C. Testing Rx queries and operators

Figure C.1. The results of the xUnit data-driven test, as shown in Visual Studio Test Explorer

Figure C.2. Visualization of the expected result of applying the FilterBursts operator on an observable that emits two bursts with a time gap between them

Figure C.3. The FilterBursts operator emits the first value from every burst of items

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.166.149