Chapter 4

How do you create an observable that emits only one value?

The operator just emits a single item and then completes. The following code snippet returns an observable that emits a string item and then completes:

single_item = Observable.just("a single item")

How do you create an observable that emits one item from each line of a text file?

This requires an observable with some specific code logic in it, so it must be implemented with the create operator. A possible implementation of this observable is as follows:

def create_line_observable(filename):
def on_subscribe(observer):
with open('filename') as f:
lines = f.readlines()
for line in lines:
observer.on_next(line)
observer.on_completed()

return Observable.create(on_subscribe)

On subscription, this observable opens the file and reads it completely. Then it outputs one item per line, and finally it completes. Each subscription to the observable lead to a full read of the file. Since lines is a generator, it is possible to use it directly to return the lines of the file:

def create_line_observable(filename):
def on_subscribe(observer):
with open('filename') as f:
lines = f.readlines()
observer.from_(lines)

return Observable.create(on_subscribe)

Does the just operator return a cold or a hot observable?

The just operator returns a single item for each subscription. This is the behavior of a cold observable.

What operator can you use to convert a cold observable to a hot observable, and start emitting items when the first observer subscribes to it?

Converting a cold observable to a hot observable is done with the publish and connect operators. The default behavior of the publish operator is to wait until the connect operator is called before emitting items. When there is a need to start emitting items as soon as the first subscription occurs, then reference counting can be used, as well as the share operator:

cold_numbers = Observable.from_([1, 2, 3, 4])
hot_numbers = cold_numbers.share()

Why is it important that observers handle errors?

Any exception that occurs in an operator is caught by RxPY and propagated as an error on the observable. If an observer does not handle errors, then these exceptions will be silently dropped, but the observable that raised the error stops emitting items. This can make debugging difficult because it seems that there has been no error, but the application stops working. So, it is important to implement the on_error callback on all observers to ensure that errors are correctly handled, or at least logged somewhere.

How can you convert an observable error to an observable item?

In some cases, it is useful to catch an error and convert it to an item. This can be done directly in the operator chain with the catch operator:

# obs is an observable that can raise an error
obs.catch(lambda e: "everything is fine")
.subscribe(lambda i: print(i))

Why should subscriptions be disposed?

When a subscription is created, it allocates some resources. The subscribe call returns a disposable object that implements a dispose method. Calling this method allows to release any resources that have been allocated during the subscription:

numbers = Observable.from_([1, 2, 3, 4])
disposable = numbers.subscribe(lambda i: print(i))
disposable.dispose()
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.249.252