Chapter 11: Bulk Operations

Bulk operations are tasks performed on a large scale, such as uploading many files, deleting many items in one shot, and assigning recipes to the same category on a mass scale. We will start by explaining the requirement and the type of bulk operation that we will consider, and then we will walk you through the different steps to implement the reactive pattern for implementing bulk operations. Finally, we will learn about the reactive pattern for tracking progress.

In this chapter, we're going to cover the following main topics:

  • Defining the requirement
  • Learning about the reactive pattern for bulk operations
  • Learning about the reactive pattern for tracking progress

Technical requirements

This chapter assumes that you have a basic understanding of RxJS.

The source code of this chapter is available at https://github.com/PacktPublishing/Reactive-Patterns-with-RxJS-for-Angular/tree/main/Chapter11.

Defining the requirement

In the UI, the bulk operation is represented by one action or event. In the background, there are two possible behaviors, whether running one network request for all the tasks or running parallel network requests for every task.

In this chapter, we want to allow the user to upload the recipe images in one shot, track the progress of the upload operation, and display this progress to the user:

Figure 11.1 – Upload the recipe's images

Figure 11.1 – Upload the recipe's images

In the RecipeCreation interface, we changed the layout of the ImageUrl field to the File Upload layout available in our library of components, PrimeNG. The File Upload layout allows the user to choose multiple files, clear the selection, and upload the files.

The upload is done on the server, and we have a specific service for the upload that takes the file to be uploaded as input and the identifier of the associated recipe to this file. Since the backend upload API supports only one file at a time, we will be running N network requests in parallel to uploading N files. This is the bulk change use case that we will consider in this chapter.

In the UI, we have one event that will trigger multiple requests at the same time. The following diagram provides a graphical representation of the bulk operation:

Figure 11.2 – A bulk operation visualization

Figure 11.2 – A bulk operation visualization

To sum up, we want to do the following:

  • Allow the user to upload many files after clicking only one time on the upload button.
  • Display the progress of this bulk operation.

Now that we have defined the requirement, let's learn in the next section how we can implement it in a reactive way.

Learning about the reactive pattern for bulk operations

We have to consider as usual our tasks as streams. The task that we are going to perform is uploading the recipe image in the backend. So, let's imagine a stream called uploadRecipeImage$ that will take the file and the recipe identifier as input and perform an HTTP request. If we have N files to be uploaded, then we will create N streams.

We want to subscribe to all those streams together, but we are not interested in the values emitted from each stream through the process; we only care about the final result (the last emission), whether the file is uploaded successfully, or whether something wrong happens and the upload fails.

Is there an RxJS operator that gathers a list of Observables together to get a cumulative result? Yes, thankfully, we have forkJoin. Let's understand the role and behavior of this operator.

The forkJoin operator

The forkJoin operator falls under the category of combination operators. If we look at the official documentation, we find this definition:

Accepts an Array of ObservableInput or a dictionary Object of ObservableInput and returns an Observable that emits either an array of values in the exact same order as the passed array, or a dictionary of values in the same shape as the passed dictionary.

In other words, forkJoin takes a list of streams as input, waits for the streams to complete, and then combines the last values they emitted in one array and returns it. The order of the values in the array is the same as the order of the input Observables.

Let's consider the following marble diagram, as usual, to better understand:

Figure 11.3 – A forkJoin marble diagram

Figure 11.3 – A forkJoin marble diagram

forkJoin has three input Observables (consider the three timelines before the operator box).

The first Observable emitted the a value; forkJoin does not emit anything (look at the last timeline after the operator box, which represents the returned result by forkJoin).

Then, the third Observable emitted 1 and, again, nothing was emitted by forkJoin. Why? Because, as we said in the definition, forkJoin will emit only once when all the Observables are complete.

So, as illustrated in the marble diagram, forkJoin emitted only once at the same time the last Observable (the second one) was completed. The first Observable (represented by the first timeline) was completed first, and the last value emitted was e. Then, the third Observable (represented by the third timeline) was completed, and the last value emitted was 4. At that time, forkJoin did not emit any value because there was still an Observable running.

Finally, the last Observable (represented by the second timeline) was completed, and the last value emitted was j. Therefore, forkJoin returns an array containing the results of each stream in the order of the input Observables (e, j, and 4).

The order of completion is not considered; otherwise, we would have had [e,4,j]. Even though the third Observable was completed before the second one, forkJoin respected the order of the input Observables and returned the j value before the 4 value.

So, keep in mind that forkJoin emits once when all the input Observables are complete and preserves the order of the input Observables.

This fits our requirements so well! forkJoin is best used when you have a list of Observables and only care about the final emitted value of each. In our case, we will issue multiple upload requests, and we only want to take action when a response is received from all the input streams.

Let's see the pattern in action.

The pattern in action

First, we need to create UploadRecipesPreviewService under src/app/core/services, which is responsible for uploading the files. Here is the code of the service:

import { HttpClient } from '@angular/common/http';

import { Injectable } from '@angular/core';

import { Observable } from 'rxjs';

import { environment } from 'src/environments/environment';

import { UploadStatus } from '../model/upload.status.model';

const BASE_PATH = environment.basePath

@Injectable({

  providedIn: 'root'

})

export class UploadRecipesPreviewService {

  constructor(private http: HttpClient) { }

  /**

   * Uploads the file

   * @param code

   * @param fileToUpload

   * @returns

   */

  upload(code: string, fileToUpload?: File):   Observable<UploadStatus> {

    const formData = new FormData()

    formData.append('fileToUpload', fileToUpload as File)

    return this.http.post<UploadStatus>(

      `${BASE_PATH}/recipes/upload/${code}`,

      formData

    )

  }

}

The upload method issues the upload HTTP request and returns the upload status. It takes as input two parameters:

  • code: The identifier of the recipe
  • fileToUpload: The file to be uploaded

Then, we need to implement the behavior of the Upload button. For this purpose, in the RecipeCreationComponent template, we need to specify the method that will be called when clicking on the Upload button. We named the onUpload method in our case and put it as a value to the callback (uploadHandler), provided by the component library we are using.

Here's the HTML template snippet:

     <div class="form-row">

      <div class="col-12">

        <label for="ImageUrl">ImageUrl</label>

        <p-fileUpload name="imageUrl" [multiple]=true

         [customUpload]="true" (uploadHandler)=

         "onUpload($event?.files)">

        </p-fileUpload>

      </div>

    </div>

The link to the full code of the template is mentioned in the Technical requirements section.

The following step is implementing the onUpload(event: any) method that is called when clicking on the Upload button.

We will create BehaviorSubject, which will always emit the last value of the uploaded files called uploadedFilesSubject$. We initialize it with an empty array:

  uploadedFilesSubject$ = new BehaviorSubject<File[]>([]);  

In the onUpload method, we send the last array of the uploaded files to uploadedFilesSubject$, as follows:

onUpload(files: File[]) {

    this.uploadedFilesSubject$.next(files);

  }

Now, we should create the stream responsible for doing the bulk upload that we called uploadRecipeImages$, as follows:

  uploadRecipeImages$ = this.uploadedFilesSubject$.pipe(

    switchMap(uploadedFiles=>forkJoin(uploadedFiles.map((

     file: File) =>

    this.uploadService.upload(this.recipeForm.value.id,

     file))))

  )

Let's break down what's going on at the level of this code, piece by piece.

Every time we click on the Upload button, uploadedFilesSubject$ will emit the files to be uploaded. We need to subscribe to uploadedFilesSubject$ and use the transformation technique using switchMap, which we learned about in Chapter 7, Transforming Streams.

We use switchMap to transform every value emitted by uploadedFilesSubject$ to the Observable that we will build using forkJoin. The forkJoin operator should take the array of streams that will be combined as input. That's what we did inside forkJoin.

We map every file in the uploadedFiles array to the stream responsible for uploading the file, and the stream responsible for uploading the file is nothing but the following instruction:

    this.uploadService.upload(this.recipeForm.value.id,

     file))))

This is simply the call to the upload method available in UploadRecipesPreviewService that takes id (which we retrieved from recipeForm) and the file as input.

So, we will have an array of Observables that we will pass as input to forkJoin, and that's it!

Finally, we need to inject UploadRecipesPreviewService in the constructor and subscribe to uploadRecipeImages$ in the template, as follows:

<ng-container *ngIf="uploadRecipeImages$ | async"></ng-

  container>

Now, let's suppose one of the inner streams errors out. The forkJoin operator will no longer emit any values for us. This is another important thing to be aware of when using this operator. You will lose the value of any other stream that would have already been completed if you do not catch the error correctly on the inner Observable. Therefore, catching the error in this case is crucial!

This is how we handle it:

  uploadRecipeImages$ = this.uploadedFilesSubject$.pipe(

    switchMap(uploadedFiles=>forkJoin(uploadedFiles.map((

     file: File) =>

    this.uploadService.upload(this.recipeForm.value.id,

     file).pipe(

      catchError(errors => of(errors)),

    ))))

We called catchError on the inner stream returned by the upload method. Then, we wrapped the error inside another Observable and returned it. This way, the forkJoin stream will stay alive.

If you are only interested in all inner Observables completing successfully, then you can catch the error on the outside.

It makes a lot of sense to catch the errors in order to display something significant to the user – for example, in our case, if one of the uploads fails because maxFileSize was reached or the extension of the image is not allowed, then the system should display those exceptions to the user to help them fix the file.

To sum up, forkJoin has the following benefits:

  • It is very useful when you are interested in combining results and getting a value only once.
  • It only emits once when all the Observables complete.
  • It preserves the order of the input Observables in the emission.
  • It will complete when one of the streams errors out, so, you should handle the error.

So far, so good. This works nicely, but what if we need to know some information during the process, such as how many files were already uploaded? What is the progress of the operation? How much time do we still need to wait? With the current forkJoin implementation, it is not possible, but let's see in the next section how we can do it.

Learning about the reactive pattern for tracking progress

In order to track the progress of the bulk upload, we can call a very useful operator called finalize. The finalize operator allows you to call a function when the Observable completes or errors out.

The idea is to call the finalize operator and execute a function that will calculate the progress. This way, every time an Observable completes, the progress will get updated. This is what the code will look like:

  counter: number = 0;

  uploadProgress: number=0;  

uploadRecipeImages$ = this.uploadedFilesSubject$.pipe(

    switchMap(uploadedFiles =>

     forkJoin(uploadedFiles.map((file: File) =>

      this.uploadService.upload(this.recipeForm.value.id,

       file).pipe(

        catchError(errors => of(errors)),

        finalize(() => this.calculateProgressPercentage(

         ++this.counter, uploadedFiles.length))

      ))))

  )

  private calculateProgressPercentage(completedRequests:

   number, totalRequests: number) {

    this.uploadProgress =

     (completedRequests/totalRequests)*100;

  }

The finalize operator calls the calculateProgressPercentage private function, which takes the following parameters:

  • The number of completed requests: We just declare a property counter that we will increment every time the Observable completes.
  • The total number of requests: This number is retrieved from the array of uploadedFiles.

Inside the calculateProgressPercentage function, we just perform a simple computation and store the result in an uploadProgress property.

Then, you can map the value of this property to any ProgressBar component in the UI. This way, you will be able to display the progress to the user.

Summary

In this chapter, we explained the concept of bulk operation and learned how to implement a real-world example of a bulk task in a reactive way. We learned the behavior and a use case of the forkJoin operator and went through the different steps to implement a bulk upload. Finally, we went through the tracking progress functionality and how we can implement it using a reactive technique.

In the next chapter, we will explore the pattern of real-time updates and the different techniques available in RxJS to implement them at the lowest cost.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.208.181