Hot and cold observables

In the preceding section, we learned how to create an observable using the Observable.from_iterable method. RxPy provides many other tools to create more interesting event sources.

Observable.interval takes a time interval in milliseconds, period, and will create an observable that emits a value every time the period has passed. The following line of code can be used to define an observable, obs, that will emit a number, starting from zero, every second. We use the take operator to limit the timer to four events:

    obs = Observable.interval(1000)
obs.take(4).subscribe(print)
# Output:
# 0
# 1
# 2
# 3

A very important fact about Observable.interval is that the timer doesn't start until we subscribe. We can observe this by printing both the index and the delay from when the timer starts definition using time.time(), as follows:

    import time

start = time.time()
obs = Observable.interval(1000).map(lambda a:
(a, time.time() - start))

# Let's wait 2 seconds before starting the subscription
time.sleep(2)
obs.take(4).subscribe(print)
# Output:
# (0, 3.003735303878784)
# (1, 4.004871129989624)
# (2, 5.005947589874268)
# (3, 6.00749135017395)

As you can see, the first element (corresponding to a 0 index) is produced after three seconds, which means that the timer started when we issue the subscribe(print) method.

Observables, such as Observable.interval, are called lazy because they start producing values only when requested (think of them as vending machines, which won't dispense food unless we press the button). In Rx jargon, these kind of observables are called cold. A property of cold observables is that, if we attach two subscribers, the interval timer will be started multiple times. This is quite evident from the following example. Here, we add a new subscription 0.5 seconds after the first, and you can see how the output of the two subscriptions come at different times:

    start = time.time()
obs = Observable.interval(1000).map(lambda a:
(a, time.time() - start))

# Let's wait 2 seconds before starting the subscription
time.sleep(2)
obs.take(4).subscribe(lambda x: print("First subscriber:
{}".format(x)))
time.sleep(0.5)
obs.take(4).subscribe(lambda x: print("Second subscriber:
{}".format(x)))
# Output:
# First subscriber: (0, 3.0036110877990723)
# Second subscriber: (0, 3.5052847862243652)
# First subscriber: (1, 4.004414081573486)
# Second subscriber: (1, 4.506155252456665)
# First subscriber: (2, 5.005316972732544)
# Second subscriber: (2, 5.506817102432251)
# First subscriber: (3, 6.0062034130096436)
# Second subscriber: (3, 6.508296489715576)

Sometimes we may not want this behavior as we may want multiple subscribers to subscribe to the same data source. To make the observable produce the same data, we can delay the data production and ensure that all the subscribers will get the same data using the publish method.

Publish will transform our observable into a ConnectableObservable, which won't start pushing data immediately, but only when we call the connect method. The usage of publish and connect is demonstrated in the following snippet:

    start = time.time()
obs = Observable.interval(1000).map(lambda a: (a, time.time() -
start)).publish()
obs.take(4).subscribe(lambda x: print("First subscriber:
{}".format(x)))
obs.connect() # Data production starts here

time.sleep(2)
obs.take(4).subscribe(lambda x: print("Second subscriber:
{}".format(x)))
# Output:
# First subscriber: (0, 1.0016899108886719)
# First subscriber: (1, 2.0027990341186523)
# First subscriber: (2, 3.003532648086548)
# Second subscriber: (2, 3.003532648086548)
# First subscriber: (3, 4.004265308380127)
# Second subscriber: (3, 4.004265308380127)
# Second subscriber: (4, 5.005320310592651)
# Second subscriber: (5, 6.005795240402222)

In the preceding example, you can see how we first issue publish, then we subscribe the first subscriber and, finally, we issue connect. When connect is issued, the timer will start producing data. The second subscriber joins the party late and, in fact, won't receive the first two messages but will start receiving data from the third and so on. Note how, this time around, the subscribers share the exact same data. This kind of data source, where data is produced independently of the subscribers, is called hot.

Similar to publish, you can use the replay method that will produce the data from the beginning for each new subscriber. This is illustrated in the following example that, which is identical to the preceding one except that we replaced publish with replay:

    import time

start = time.time()
obs = Observable.interval(1000).map(lambda a: (a, time.time() -
start)).replay(None)
obs.take(4).subscribe(lambda x: print("First subscriber:
{}".format(x)))
obs.connect()

time.sleep(2)
obs.take(4).subscribe(lambda x: print("Second subscriber:
{}".format(x)))

First subscriber: (0, 1.0008857250213623)
First subscriber: (1, 2.0019824504852295)
Second subscriber: (0, 1.0008857250213623)
Second subscriber: (1, 2.0019824504852295)
First subscriber: (2, 3.0030810832977295)
Second subscriber: (2, 3.0030810832977295)
First subscriber: (3, 4.004604816436768)
Second subscriber: (3, 4.004604816436768)

You can see how, this time around, even though the second subscriber arrives late to the party, it is still given all the items that have been given out so far.

Another way of creating hot observables is through the Subject class. Subject is interesting because it's capable of both receiving and pushing data, and thus it can be used to manually push items to an observable. Using Subject is very intuitive; in the following code, we create a Subject and subscribe to it. Later, we push values to it using the on_next method; as soon as we do that, the subscriber is called:

    s = Subject()
s.subscribe(lambda a: print("Subject emitted value: {}".format(x))
s.on_next(1)
# Subject emitted value: 1
s.on_next(2)
# Subject emitted value: 2

Note that Subject is another example of hot observables.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.204.0