Useful operators

In this subsection, we will explore operators that transform the elements of a source observable in some way. The most prominent member of this family of operators is the familiar map, which emits the elements of the source observable after applying a function to them. For example, we may use map to calculate the square of a sequence of numbers:

    (Observable.from_iterable(range(4))
.map(lambda x: x**2)
.subscribe(print))
# Output:
# 0
# 1
# 4
# 9

Operators can be represented with marble diagrams that help us better understand how the operator works, especially when taking into account the fact that elements can be emitted over a region of time. In a marble diagram, a data stream (in our case, an observable) is represented by a solid line. A circle (or other shape) identifies a value emitted by the observable, an X symbol represents an error, and a vertical line represents the end of the stream.

In the following figure, we can see the marble diagram of map:

The source observable is placed at the top of the diagram, the transformation is placed in the middle, and the resulting observable is placed at the bottom.

Another example of a transformation is group_by, which sorts the items into groups based on a key. The group_by operator takes a function that extracts a key when given an element and produces an observable for each key with the elements associated to it.

The group_by operation can be expressed more clearly using a marble diagram. In the following figure, you can see how group_by emits two observables. Additionally, the items are dynamically sorted into groups as soon as they are emitted:

We can further understand how group_by works with a simple example. Let's say that we want to group the number according to the fact that they're even or odd. We can implement this using group_by by passing the lambda x: x % 2 expression as a key function, which will return 0 if the number is even and 1 if the number is odd:

    obs = (Observable.from_range(range(4))
.group_by(lambda x: x % 2))

At this point, if we subscribe and print the content of obs, actually two observables are printed:

    obs.subscribe(print)
# <rx.linq.groupedobservable.GroupedObservable object at 0x7f0fba51f9e8>
# <rx.linq.groupedobservable.GroupedObservable object at 0x7f0fba51fa58>

You can determine the group key using the key attribute. To extract all the even numbers, we can take the first observable (corresponding to a key equal to 0) and subscribe to it. In the following code, we show how this works:

    obs.subscribe(lambda x: print("group key: ", x.key))
# Output:
# group key: 0
# group key: 1
obs.take(1).subscribe(lambda x: x.subscribe(print))
# Output:
# 0
# 2

With group_by, we introduced an observable that emits other observables. This turns out to be quite a common pattern in reactive programming, and there are functions that allow you to combine different observables.

Two useful tools for combining observables are merge_all and concat_all. Merge takes multiple observables and produces a single observable that contains the element of the two observables in the order they are emitted. This is better illustrated using a marble diagram:

merge_all can be compared to a similar operator, concat_all, which returns a new observable that emits the elements of all the elements of the first observable, followed by the elements of the second observable and so on. The marble diagram for concat_all is presented here:

To demonstrate the usage of these two operators, we can apply those operations to the observable of observables returned by group_by. In the case of merge_all, the items are returned in the same order as they were initially (remember that group_by emits elements in the two groups as they come):

    obs.merge_all().subscribe(print)
# Output
# 0
# 1
# 2
# 3

On the other hand, concat_all first returns the even elements and then the odd elements as it waits for the first observable to complete, and then starts emitting the elements of the second observable. This is demonstrated in the following snippet. In this specific example, we also applied a function, make_replay; this is needed because, by the time the "even" stream is consumed, the elements of the second stream have already been produced and will not be available to concat_all. This concept will become much clearer after reading the Hot and cold observables section:

    def make_replay(a):
result = a.replay(None)
result.connect()
return result

obs.map(make_replay).concat_all().subscribe(print)
# Output
# 0
# 2
# 1
# 3

This time around, the even numbers are printed first, followed by the odd numbers. 

RxPy also provides the  merge and concat operations that can be used to combine individual observables
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.57.172