Subscriptions

The preceding code describes a situation where there is a need for a cleanup to happen. We have defined a setInterval() construct that seemingly emits values forever. cleanUpFn() has the ability to cancel that behavior, providing it is being invoked. We return cleanUpFn() at the end of our behavior function.

The question is, how do we get hold of it? The answer is that we need to talk about a new concept: subscription. A subscription is something that we get back when calling subscribe() on a stream. Let's amend the preceding code with just that:

let stream$ = Rx.Observable.create(observer => {
let counter = 0;
let id = setInterval(() => observer.next(counter++), 1000);
return function cleanUpFn() { clearInterval(id); }
});

let subscription = stream$.subscribe((data) => console.log('data'));
setTimeout(() => subscription.unsubscribe(), 2000);

In the preceding code, we have created the variable subscription by calling subscribe(), but the really interesting part happens in the last line: we define a timeout that calls unsubscribe() on our subscription. This will call our cleanUpFn() so that the interval is cancelled.

Not many streams that you deal with will need to be unsubscribed from, but the ones that allocate resources or start off some construct that goes on forever, without us intercepting, will need to have a cleanup behavior which we need to invoke once we are done with our stream.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.216.27.91