Revisiting the basics, adding errors, and complete

After having taken on the heroic feat of implementing the basics of RxJS, we hopefully feel pretty good about understanding its inner workings. So far, we have only implemented dataFn in subscribe(); there are two more callbacks in the subscribe() method that we need to implement. Let's look at a code snippet and highlight what is missing:

let stream$ = Rx.Observable.of(1,2,3);
stream$.subscribe(
data => console.log(data),
err => console.error(err),
() => console.log('complete');
)

We have highlighted the two last callbacks as the missing functionality. We know from before that to trigger the error callback, we need to call observer.error('some message'). We also know that no values should be emitted after an error is raised. Let's provide an example of such a case:

let stream$ = Rx.Observable.create( observer => {
observer.next(1);
observer.error('err');
observer.next(2);
});

stream$.subscribe(
data => console.log(data),
err => console.error(err)
);
// should emit 1, err

At this point, we realize that our Observer class is the one that needs amending to support the error() method call. We also need to be wary of the condition we just described, as no more values should be emitted after an error has occurred. Let's jump into an implementation:

class Observer {
hasError: boolean;
constructor(private dataFn, private errorFn) {}
next(value) {
if (!this.hasError) {
this.dataFn(value);
}
}

error(err) {
this.errorFn(err);
this.hasError = true;
}

}

We can see in the preceding snippet that we pass another parameter into the errorFn constructor. The next() method needed updating, so we needed to envelope it with a conditional that says whether to generate a value or not. Lastly, we needed to define the error() method as calling the passed-in errorFn and setting the hasError field to true.

We need to do one more thing and that is to update our subscribe() method in the Observable class:


class Observable {
behaviourFn;
static create(behaviourFn): Observable {
return new Observable(behaviourFn);
}

constructor(behaviourFn) {
this.behaviourFn = behaviourFn;
}

subscribe(dataFn, errorFn) {
let observer = new Observer(dataFn, errorFn);
let cleanUpFn = this.behaviourFn(observer);

return {
unsubscribe: cleanUpFn
};
}
}

A little heads up is that when we define the filter() operator to override the  next() method, we need to ensure this one takes hasError into consideration when determining whether to generate a value. We'll leave this to you, dear reader, to implement.

The last order of business is to support completion. Completion has many similarities with raising an error, in the sense that no more values should be emitted. The difference is that we should hit the last callback instead. As with the error() method implementation, we start with the Observer implementation:

// rxjs-core/error-complete/Observer.ts

class Observer {
hasError: boolean;
isCompleted: boolean;

constructor(
private dataFn,
private errorFn,
private completeFn
) {}

next(value) {
if(!this.hasError && !this.isCompleted) {
this.dataFn(value);
}
}

error(err) {
this.errorFn(err);
this.hasError = true;
}

complete() {
this.completeFn();
this.isCompleted = true;
}

}

Given the preceding code, we see that our changes entail adding an isCompleted field. We also pass a completeFn() in the constructor. Logic needs to be added in the next() value, as completion is now another state we need to look for besides error. Lastly, we added the complete() method, which just invokes the passed-in function and sets the isComplete field to true.

As before, we need to update the Observable class to pass the complete function:

// rxjs-core/error-complete/Observable.ts

import { Observer } from './Observer';

class Observable {
behaviourFn;

static create(behaviourFn): Observable {
return new Observable(behaviourFn);
}

constructor(behaviourFn) {
this.behaviourFn = behaviourFn;
}

filter(filterFn):Observable {
return new FilterableObservable(
filterFn,
this.behaviourFn
);
}

subscribe(dataFn, errorFn, completeFn) {
let observer = new Observer(dataFn, errorFn, completeFn);
let cleanUpFn = this.behaviourFn( observer );

return {
unsubscribe: cleanUpFn
};
}
}

const stream$ = new Observable(observer => {
observer.next(1);
observer.error("error");
observer.next(2);
});

stream$.subscribe(
data => console.log("data", data),
err => console.log("error", err),
() => console.log("completed")
);

// prints 1, error, no more is emitted after that

A quick reality check here: we have actually implemented the core functionality of RxJS—Observer, Observable, and one operator. We are much closer to understanding what is going on. We realize that implementing the other 59 operators is quite a feat, and it is probably not a good idea when there is a team maintaining the existing RxJS repository. Our newfound knowledge is not for nothing; understanding what is going on can never be wrong. Who knows? Maybe one of you readers will become a contributor; you have certainly been given the tools. 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.98.111