Subjects

We are used to using Observables in a certain way. We construct them from something and we start listening to values that they emit. There is usually very little we can do to affect what is being emitted after the point of creation. Sure, we can change it and filter it, but it is next to impossible to add more to our Observable unless we merge it with another stream. Let's have a look at when we are really in control of what is being emitted when it comes to Observables, using the create() operator:

let stream$ = Rx.Observable.create(observer => {
observer.next(1);
observer.next(2);
});

stream$.subscribe(data => console.log(data));

We see the Observable acting as a wrapper around the thing that really emits our values, the Observer. In our Observer instance, the Observer is calling next(), with a parameter to emit values – values that we listen to in our subscribe() method.

This section is about the Subject. The Subject differs from the Observable in that it can affect the content of the stream after its creation. Let's have a look at just that with the following piece of code:

// subjects/subject.js

const Rx = require("rxjs/Rx");

let subject = new Rx.Subject();

// emits 1
subject.subscribe(data => console.log(data));

subject.next(1);

The first thing we notice is how we just call the constructor instead of using a factory method like create() or from() or similar, as we do on an Observable. The second thing we notice is how we subscribe to it on the second line, and only on the last line do we emit values by calling next(). Why is the code written in this order? Well, if we didn't write it this way and have the next() call happen as the second thing, our subscription wouldn't be there, and the value would have been emitted straight away. We know two things for sure, though: we are calling next(), and we are calling subscribe(), which makes Subject a double nature. We did mention another thing the Subject was capable of: changing the stream after creation. Our call to next() is literally doing that. Let's add a few more calls so we ensure we really get the idea:

// subjects/subjectII.js

const Rx = require("rxjs/Rx");

let subject = new Rx.Subject();

// emits 10 and 100 2 seconds after
subject.subscribe(data => console.log(data));
subject.next(10);

setTimeout(() => {
subject.next(100);
}, 2000);

As we stated before, all the calls we make to the next() method enable us to affect the stream; we see in our subscribe() method that every call to next() leads to the subscribe() being hit, or, technically, the first function we pass into it.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.219.73.146