The Problem with Cold Observables

So far, you’ve learned that each new subscription to an observable runs the root creation function:

 let​ myObs$ = ​new​ Observable(o => {
  console.log(​'Creation Function'​);
  setInterval(() => o.next(​'hello'​, Math.random()), 1000);
 });
 
 myObs$.subscribe(x => console.log(​'streamA'​, x));
 
 setTimeout(() => {
  myObs$.subscribe(x => console.log(​'streamB'​, x));
 }, 500);

When you run the above snippet, you see Creation Function logged to the console twice, showing that you’ve created two entirely separate observable streams. Each observable stream sees hello at different times, with separate random numbers attached. Rx does this by default to ensure every stream is isolated from every other stream, keeping your code manageable.

On the other hand, sometimes you don’t want to trigger the creation logic every time another part of your application wants to listen in on the result. Consider an AJAX request that fetches a user object:

 let​ user$ = ajax(​'/user'​);

You might naively pass around user$ to all the components in your application, where all of the listeners happily subscribe and pull whatever information they need. This solution works until the backend engineer comes chasing after us with a pile of excessive log files demanding to know why every page load makes seventeen requests to the /user endpoint. Oops. That’s because observables are “cold” by default—each new subscription creates an entire new observable and stream of data. In this case, each subscribe makes an independent request to the backend for the same data.

images/ColdObservable.png

You need something to multiplex your data—to make a single request but distribute it to multiple subscribers as shown in the figure.

images/HotObservable.png

Fortunately, RxJS provides a multitude of options. The simplest one is the share operator, which is called on a single, cold observable stream and converts the stream into a hot stream. This conversion doesn’t happen immediately; share waits until there’s at least one subscriber and then subscribes to the original observable (triggering the AJAX request). Further subscriptions do not create a new observable; instead they all listen in to the data that the original subscription produces. Updating the “Creation Function” example, you’ll see:

 import​ { Observable } ​from​ ​'rxjs'​;
 import​ { share } ​from​ ​'rxjs/operators'​;
 
 let​ myObs$ = ​new​ Observable(o => {
  console.log(​'Creation Function'​);
  setInterval(() => o.next(​'hello '​ + Math.random()), 1000);
 })
  .pipe(
  share()
  );
 
 myObs$.subscribe(x => console.log(​'streamA'​, x));
 
 setTimeout(() => {
  myObs$.subscribe(x => console.log(​'streamB'​, x));
 }, 500);

share is a good tool to use when new listeners don’t care about previous data (like in the stock market example). Unfortunately for our RxJS loving users, the following still doesn’t work:

 let​ user$ = ajax(​'/user'​)
 .pipe(share());

The first component that subscribes to user$ triggers a request to the server. When the request finishes, all subscribers are given the returned data simultaneously. Anyone who subscribes after the initial request finishes is plumb out of luck. One solution is to delay the triggering of the request. When a stream is multiplexed with share, the trigger is the first subscriber. An alternative solution is to manually trigger the subscription by breaking the multiplexing into two parts: publish and connect.

publish converts our unicasted observable into a multicasted one but adds no additional logic around subscribing like share does. Instead, the publish operator won’t do anything until there’s a manual trigger to start the stream. The manual trigger is a call to the connect method.

 // A multicasted observable you can pass to all of our components
 let​ users$ = ajax(​'/user'​)
 .pipe(publish());
 
 // Once all of our components are subscribed
 user$.connect();

Using publish and connect allows for fine-grained control over when an observable finally starts. This control can be enormously powerful when tuning for performance or in cases like lazy loading where you can avoid triggering requests until the user loads a particular section of your application.

Behind the scenes, both share and publish/connect use the Subject class. Understanding subjects is the final step to unlocking all that RxJS has to offer you.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.235.176