Chapter 9: Caching Streams

Caching data and assets is one of the most efficient ways to improve the user experience of our web applications. It is a good way to speed up the load times of our web applications and keep the number of network requests to a minimum.

We will start this chapter by defining the caching requirement for the client side and the motivation behind it. Then, we will learn how to implement this requirement in a reactive way and explore the useful operators in RxJS. After that, we will describe a better way to do it using RxJS 7. Finally, we will highlight the use cases of caching streams.

In this chapter, we're going to cover the following main topics:

  • Defining the requirement
  • Learning about using the reactive pattern to cache streams
  • Exploring the RxJS 7 recommended pattern to cache streams
  • Highlighting the use cases of caching streams

Technical requirements

This chapter assumes that you have a basic understanding of RxJS.

The source code of this chapter is available in https://github.com/PacktPublishing/Reactive-Patterns-with-RxJS-for-Angular/tree/main/Chapter09.

Defining the requirement

As you have learned throughout the previous chapters, the HTTPClient module is Observable-based, which means that methods such as get, post, put and delete return an Observable.

So, subscribing multiple times to this Observable will cause the source Observable to be re-created over and over again, hence performing a request on each subscription. It is a cold Observable, as we learned in Chapter 8, Multicasting Essentials. This behavior will result in an overhead of HTTP requests, which may decrease the performance of your web applications, especially if the server takes some time to respond.

Reducing HTTP requests by caching the result on the client side is one of the most commonly used techniques to optimize web applications. But when should we cache data? When data doesn't change frequently and it is used by more than one component, it makes a lot of sense to cache it and share it in multiple places. The user's profile data is a good example.

In the case of our RecipesApp, the /api/recipes GET request is called every time the RecipesList component is rendered in order to load the list of recipes. In other words, every time the user clicks on the recipe book logo or navigates between HomeComponent and RecipeCreationComponent, a GET request will be issued even if the list has not changed. This is what the Network tab in the Chrome console will look like:

Figure 9.1 – The GET HTTP requests overhead

Figure 9.1 – The GET HTTP requests overhead

As you may have noticed, all those outgoing requests are the result of the navigation between HomeComponent and the other components.

We will assume that the list of recipes does not change frequently. In this case, it is useless to request the server on every component's load; it would be better to cache the result and read the data from the cache to enhance the performance and the user experience. But what about updates? We absolutely won't force the user to reload the entire page just to get the most recent data from the server. Instead, we will configure an update interval to update the cache in the background so that the UI gets updated consequently.

Well, in a real-world application, we wouldn't refresh the data by requesting the server every interval (this technique is called polling). Instead, we will put in place a server push notification. But in this chapter, in order to understand the caching behavior in RxJS through basic examples, we will not use the push notification server to update the cache, which is an advanced technique; we will instead implement two simple types of cache:

  • A static cache for static data that does not refresh
  • A cache with refresh capacity

So, without further ado, let's look in the next section at how we can implement this.

Learning about using the reactive pattern to cache streams

What you will be glad to find is that RxJS ships with a very useful operator to put in place a caching mechanism for streams, which is the shareReplay operator.

The shareReplay operator shares an Observable execution with multiple subscribers. It has an optional parameter, which is bufferSize. Now, bufferSize represents the number of emissions that are cached and replayed for every subscriber. In our case, we only need to replay the most recent value; hence, we need to set the bufferSize parameter to 1.

In a nutshell, the shareReplay operator does the following:

  • Shares an Observable execution with multiple subscribers
  • Offers the possibility to replay a specified number of emissions to the subscribers

Our recipes$ stream defined in RecipesService is initially a cold stream:

export class RecipesService {

recipes$ = this.http.get<Recipe[]>(`${BASE_PATH}/recipes`);

}

This means that the stream's data is re-emitted for every subscriber, resulting in an overhead of HTTP requests. This is not what we want. We want to share the last stream's emission with all the subscribers, in other words, transforming the cold stream to a hot stream using the shareReplay operator as follows:

export class RecipesService {

recipes$ = this.http.get<Recipe[]>(`${BASE_PATH}/recipes`).pipe(shareReplay(1));

}

By passing 1 as an argument, shareReplay cached the recipes$ last emission. But we can optimize further. The previous code will create a new cache instance for every subscriber. But what about sharing a single instance with all the subscribers? That would be much more performant. The following code illustrates the implementation:

export class RecipesService {

  recipes$ = this.getRecipesList();

  getRecipesList(): Observable<Recipe[]> {

    if (!this.recipes$) {

      return this.http.get<Recipe[]>(

       `${BASE_PATH}/recipes`).pipe(shareReplay(1));

    }

    return this.recipes$;

  }

}

Let's break down what is going on at the level of this code. We created a new method called getRecipesList() that will issue the HTTP request unless recipes$ is not defined. Then, we initialize recipes$ with the result of getRecipesList(). That's all!

Now let's explain the complete workflow:

  1. The first time HomeComponent is initialized.
  2. HomeComponent triggers the rendering of the child component: RecipesListComponent.
  3. RecipesListComponent calls the recipes$ observable available in RecipeService, which will perform the GET HTTP request to retrieve the list of recipes since this is the first time we ask for the data.
  4. Then, the cache will be initialized by the data coming back from the server.
  5. The next time the data is requested, it will be retrieved from the cache.

Under the hood, the shareReplay operator creates a ReplaySubject instance that will replay the emissions of the source Observable with all the future subscribers. After the first subscription, it will connect the subject to the source Observable and broadcast all its values. This is the multicasting concept explained in Chapter 8, Multicasting Essentials.

The next time we request the recipes list, our cache will replay the most recent value and send it to the subscriber. There's no additional HTTP call involved. So, when the user leaves the page, it unsubscribes and replays the values from the cache.

This works perfectly fine when the data does not need to be refreshed at all. But as described in the requirement, we need to refresh RecipesList every interval. If the polling technique is used, then we can update the cache this way:

import { timer } from 'rxjs/observable/timer';

import { switchMap, shareReplay } from 'rxjs/operators';

const REFRESH_INTERVAL = 50000;

const timer$ = timer(0, REFRESH_INTERVAL);

export class RecipesService {

  recipes$ = this.getRecipesList();

  getRecipesList(): Observable<Recipe[]> {

    if (!this.recipes$) {

      return timer$.pipe(

      switchMap(_ =>

      this.http.get<Recipe[]>(`${BASE_PATH}/recipes`)),

      shareReplay(1)

    );

    }

    return this.recipes$;

  }

}

Let's break down what is happening at the level of this code. We created a timer$ observable that will emit every 50 seconds. This interval is configured in the REFRESH_INTERVAL constant. We used the timer function available in RxJS to create the timer$ observable. For more details about the timer function, please refer to this link: https://rxjs.dev/api/index/function/timer#examples.

Then, for every emission, we use the concatMap operator to transform the value to the Observable returned by the HTTP client. This will issue an HTTP GET every 50 seconds and consequently update the cache. This is a known pattern using RxJS to execute a treatment every x seconds.

Note

If we use a push-based solution, then we can intercept the event of receiving messages to update the data.

Now let's see how we can customize the shareReplay operator.

With RxJS 6.4.0, a new signature was provided to allow configuring behavior when the operator's internal reference counter drops to zero. This is the contract of the ShareReplayConfig interface followed by the new signature of the shareReplay operator:

interface ShareReplayConfig {

  bufferSize?: number;

  windowTime?: number;

  refCount: boolean;

  scheduler?: SchedulerLike;

}

function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>;

When refCount is enabled (set to true), the source will be unsubscribed when the reference count drops to zero and the source will no longer emit. If refCount is disabled (set to false), the source will not be unsubscribed from, meaning that the inner ReplaySubject will still be subscribed to the source and potentially run forever.

To prevent surprises, it is highly recommended to use the signature that takes the config parameter this way:

  shareReplay({bufferSize: 1, refCount: true })

So, the final code of RecipesService will look like this:

import { Injectable } from '@angular/core';

import { HttpClient } from '@angular/common/http';

import { BehaviorSubject, Observable, ReplaySubject, timer } from 'rxjs';

import { Recipe } from '../model/recipe.model';

import { environment } from 'src/environments/environment';

import { shareReplay, switchMap } from 'rxjs/operators';

const BASE_PATH = environment.basePath

const REFRESH_INTERVAL = 50000;

const timer$ = timer(0, REFRESH_INTERVAL);

@Injectable({

  providedIn: 'root'

})

export class RecipesService {

  recipes$ = this.getRecipesList();

  private filterRecipeSubject = new

   BehaviorSubject<Recipe>({ title: '' });

  filterRecipesAction$ =

   this.filterRecipeSubject.asObservable();

  constructor(private http: HttpClient) { }

  updateFilter(criteria: Recipe) {

    this.filterRecipeSubject.next(criteria);

  }

  getRecipesList(): Observable<Recipe[]> {

    if (!this.recipes$) {

      return timer$.pipe(

      switchMap(_ =>

      this.http.get<Recipe[]>(`${BASE_PATH}/recipes`)),

      shareReplay({bufferSize: 1, refCount: true })

    );

    }

    return this.recipes$;

  }

  saveRecipe(formValue: Recipe): Observable<Recipe> {

    return this.http.post<Recipe>(

     `${BASE_PATH}/recipes/save`, formValue);

  }

}

Simple, right? Now that we know very well the behavior of shareReplay and all the useful options, let's explore a better way to do it using RxJS 7 in the next section.

Exploring the RxJS 7 recommended pattern to cache streams

A lot of work was done in version 7 to consolidate multicasting operators. The multicast, publish, publishReplay, publishLast, and refCount operators were deprecated and will be removed in RxJS 8.

The only operators remaining are shareReplay, share, and connectable. And the share operator rules them all, meaning that it is highly recommended to use the share operator instead of Connectable and shareReplay in most cases. The shareReplay operator is too popular to deprecate but may be deprecated in future versions as there is an alternative to it, especially because shareReplay, when not used carefully, can cause memory leaks, particularly with infinite streams.

The share operator is enhanced in version 7 with an optional configuration object as an argument, which makes it more flexible and ready to do the job of other operators.

In this share configuration object, you can choose the subject you're connecting through and define your reset behavior. And this is how we can achieve the behavior of the shareReplay operator using the share operator:

getRecipesList(): Observable<Recipe[]> {

    if (!this.recipes$) {

      return this.http.get<Recipe[]>(

       `${BASE_PATH}/recipes`).pipe(share({

        connector : () => new ReplaySubject(),

        resetOnRefCountZero : true,

        restOnComplete: true,

        resetOnError: true

      }));

    }

    return this.recipes$;

  }

If you're using version 7 of RxJS, it is highly recommended to call the share operator instead of shareReplay. The preceding code has the same behavior as the shareReplay operator.

Highlighting the use cases of caching streams

The first use case is optimizing the HTTP requests in order to enhance the performance of our web applications. All that you have to do is put the result in a cache that is a shared place for all the consumers. This is what we did in the previous section.

There is another use case where caching streams makes a lot of sense: when accounting for expensive side effects on the streams. In general, we call the actions that we perform after a value is emitted side effects. This could be logging, displaying messages, doing a kind of mapping, and so on. Here's an example of a side effect using the tap operator:

import {map, from } from 'rxjs';

import { tap } from 'rxjs/operators';

const stream$ = from([1, 2, 'Hello', 5]);

stream$

  .pipe(

    tap((value) => console.log(value)),

    map((element) => {

      if (isNaN(element as number)) {

        throw new Error(element + ' is not a number');

      }

      return (element as number) * 2;

    })

  )

  .subscribe({

    next: (message) => console.log(message),

    error: (error) => console.log(error),

    complete: () => console.log('Stream Completed'),

  });

//console output

1

2

2

4

Hello

Error

In this code, we are performing a transformation for every number emitted; we multiply it by 2 and return the multiplied value. If the value is not a number, an error is thrown. But we need to log the initial value before the transformation. That's why we called the tap operator before the map operator to log the original value. This is a basic example of a side effect, but it could be also handling errors or displaying messages.

Note

For further details about the tap operator, please refer to the official documentation: https://rxjs.dev/api/operators/tap.

In some situations, the side effects can perform actions more complex than logging, displaying messaging and handling errors. Some side effects perform some computations that represent an expensive treatment in terms of performance. Unfortunately, those treatments will be executed by every subscriber, even though it is enough to execute them only once. Otherwise, it will harm the performance of your application.

If you have this use case in your application, then it is highly recommended to use the share operator to cache the result and execute heavy treatments only once.

Summary

In this chapter, we explained the concepts of caching in web applications, including its benefits and use cases. We focused on a concrete example in our recipes app, detailed the requirement, and implemented it in a reactive way. We learned about the behavior of the shareReplay operator and the alternative implementation using the share operator in RxJS 7. We also learned how to implement a static cache and a cache with refresh capacity. Finally, we highlighted the most common use cases of caching.

In the next chapter, we will explore another useful reactive pattern that allows you to share data between your components. As usual, we will demystify the concepts and then learn the reactive way to do it.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.240.103