Implementing create()

At the beginning of this chapter, we were taught how to create an Observable. The code looked like this:

let stream$ = Rx.Observable.create( observer => observer.next(1));
stream$.subscribe( data => console.log(data));

Just by looking at the code, we can make educated guesses as to what's going on underneath. It's clear we need an Observable class.

The class needs a create() method that takes a function as a parameter. The create() method should return an Observable. Furthermore, our Observable class needs a subscribe() method that takes a function as a parameter. Let's start off there and see where we land.

First, let's define our Observable class with the aforementioned methods:

class MyObservable {
static create(behaviourFn): MyObservable {}
constructor() {}
subscribe(dataFn) {}
}

OK, so we have a class with three methods in it; let's attempt to implement the methods. Let's take what we know about the create() method and start from there:

class MyObservable {
static create(behaviourFn): MyObservable {
return new Observable(behaviourFn);
}
constructor(private behaviourFn) {}
subscribe(dataFn) {}
}

We highlighted the required changes in bold and introduced a field on the class called behaviourFn(). Furthermore, our create() method instantiated an Observable by passing in behaviourFn from the create() method parameter. This means the constructor needs to take a function as a parameter and save that for later use. What do we know about the behaviourFn() that was passed into the create method? We know it takes an Observer instance as a parameter and it also lays out what values the Observer instance should emit. For anything to be able to capture those emitted values, we need to implement our last method, subscribe(). We know that subscribe() takes dataFn() as a parameter and needs to somehow invoke our behaviourFn when the subscribe() method is being invoked to trigger the behavior. Let's therefore amend that in our existing code:

class MyObservable {
static create(private behaviourFn): MyObservable {
return new MyObservable(behaviourFn);
}
constructor(behaviourFn) { this.behaviourFn = behaviourFn; }
subscribe(dataFn) {
this.behaviourFn(observer);
}
}

At this point, we realize that we need an Observer class so that we actually have something to pass to our behaviourFn(). Another thing we need to figure out is how to invoke dataFn() and when. After a thinking for a minute, we realize the Observer must be the one responsible for invoking dataFn() so it seems only reasonable that dataFn() is passed into the constructor of our Observer class for later use, like so:

class Observer {
constructor(private dataFn) {}
next(value) { this.dataFn(val) }
}

By implementing this Observer class, we have done three things: one is to pass the dataFn() through the constructor and make it into a field on the Observer class; another is to create a next() method on the Observer, which had to be done as we learned that an Observer instance should call next() to generate values; the third and final thing we did was to ensure that we invoked dataFn() inside of the next() method to be sure that the subscriber is being told every time we generate a value by calling the next() method. Putting all of this code together, we have created a very bare implementation of RxJS, which actually works! To better understand what we have so far, let's display all the code used so far:

// rxjs-core/Observable.ts

class Observer {
constructor(private dataFn) {}
next(value) { this.dataFn(value) }
}

class MyObservable {
behaviourFn;
static create(behaviourFn): MyObservable {
return new Observable(behaviourFn);
}
constructor(behaviourFn) { this.behaviourFn = behaviourFn; }
subscribe(dataFn) {
let observer = new Observer(dataFn);
this.behaviourFn( observer );
}
}

let stream$ = MyObservable.create( observer => observer.next(1)); // 1
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.173.103