Observables

As the name implies, the main idea of reactive programming is to react to events. In the preceding section, we saw some examples of this idea with callbacks; you subscribe to them and the callback is executed as soon as the event takes place.

In reactive programming, this idea is expanded by thinking of events as streams of data. This can be exemplified by showing examples of such streams in RxPy. A data stream can be created from an iterator using the Observable.from_iterable factory method, as follows:

    from rx import Observable
obs = Observable.from_iterable(range(4))

In order to receive data from obs, we can use the Observable.subscribe method, which will execute the function we pass for each value that the data source emits:

    obs.subscribe(print)
# Output:
# 0
# 1
# 2
# 3

You may have noted that observables are ordered collections of items just like lists or, more generally, iterators. This is not a coincidence.

The term observable comes from the combination of observer and iterable. An observer is an object that reacts to changes of the variable it observes, while an iterable is an object that is capable of producing and keeping track of an iterator.

In Python, iterators are objects that define the __next__ method, and whose elements can be extracted by calling next. An iterator can generally be obtained by a collection using iter; then we can extract elements using next or a for loop. Once an element is consumed from the iterator, we can't go back. We can demonstrate its usage by creating an iterator from a list:

    collection = list([1, 2, 3, 4, 5])
iterator = iter(collection)

print("Next")
print(next(iterator))
print(next(iterator))

print("For loop")
for i in iterator:
print(i)

# Output:
# Next
# 1
# 2
# For loop
# 3
# 4
# 5

You can see how, every time we call next or we iterate, the iterator produces a value and advances. In a sense, we are pulling results from the iterator.

Iterators sound a lot like generators; however, they are more general. In Python, generators are returned by functions that use yield expressions. As we saw, generators support next, therefore, they are a special class of iterators.

Now you can appreciate the contrast between an iterator and an observable.  An observablepushes a stream of data to us whenever it's ready, but that's not everything. An observable is also able to tell us when there is an error and where there is no more data. In fact, it is possible to register further callbacks to the Observable.subscribe method. In the following example, we create an observable and register callbacks to be called using on_next whenever the next item is available and using the on_completed argument when there is no more data:

    obs = Observable.from_iter(range(4))
obs.subscribe(on_next=lambda x: print(on_next="Next item: {}"),
on_completed=lambda: print("No more data"))
# Output:
# Next element: 0
# Next element: 1
# Next element: 2
# Next element: 3
# No more data

This analogy with the iterator is important because we can use the same techniques that can be used with iterators to handle streams of events.

RxPy provides operators that can be used to create, transform, filter, and group observables. The power of reactive programming lies in the fact that those operations return other observables that can be conveniently chained and composed together. For a quick taste, we will demonstrate the usage of the take operator.

Given an observable, take will return a new observable that will stop after n items. Its usage is straightforward:

    obs = Observable.from_iterable(range(100000))
obs2 = obs.take(4)

obs2.subscribe(print)
# Output:
# 0
# 1
# 2
# 3

The collection of operations implemented in RxPy is varied and rich, and can be used to build complex applications using these operators as building blocks.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.67.16