Pulling data through a Mono/Flux and chain of operations

We have wired up a repository to interface with MongoDB through Spring Data. Now we can start hooking it into our ImageService.

The first thing we need to do is inject our repository into the service, like this:

    @Service 
    public class ImageService { 
      ... 
      private final ResourceLoader resourceLoader; 
 
      private final ImageRepository imageRepository; 
 
      public ImageService(ResourceLoader resourceLoader, 
       ImageRepository imageRepository) { 
         this.resourceLoader = resourceLoader; 
         this.imageRepository = imageRepository; 
      } 
      ... 
    } 

In the previous chapter, we loaded Spring's ResourceLoader. In this chapter, we are adding ImageRepository to our constructor.

Previously, we looked up the names of the existing uploaded files, and constructed a Flux of Image objects. That required coming up with a contrived id value.

Now that we have a real data store, we can simply fetch them all, and return them to the client, like this:

    public Flux<Image> findAllImages() { 
      return imageRepository.findAll(); 
    } 

In this last bit of code, we leverage imageRepository to do all the work with its findAll() method. Remember--findAll was defined inside ReactiveCrudRepository. We didn't have to write it ourselves. And since it already gives us a Flux<Image>, there is no need to do anything else.

It's good to remember that the Flux of images being returned is lazy. That means that only the number of images requested by the client is pulled from the database into memory and through the rest of the system at any given time. In essence, the client can ask for one or as many as possible, and the database, thanks to reactive drivers, will comply.

Let's move on to something a little more complex--storing a Flux of images as follows:

    public Mono<Void> createImage(Flux<FilePart> files) { 
      return files 
       .flatMap(file -> { 
         Mono<Image> saveDatabaseImage = imageRepository.save( 
           new Image( 
             UUID.randomUUID().toString(), 
              file.filename())); 
 
             Mono<Void> copyFile = Mono.just( 
               Paths.get(UPLOAD_ROOT, file.filename()) 
                .toFile()) 
                .log("createImage-picktarget") 
                .map(destFile -> { 
                  try { 
                    destFile.createNewFile(); 
                    return destFile; 
                  } catch (IOException e) { 
                      throw new RuntimeException(e); 
                  } 
                }) 
                .log("createImage-newfile") 
                .flatMap(file::transferTo) 
                .log("createImage-copy"); 
 
            return Mono.when(saveDatabaseImage, copyFile); 
       }) 
       .then(); 
    } 

The preceding code can be described as follows:

  • With a Flux of multipart files, flatMap each one into two independent actions: saving the image and copying the file to the server.
  • Using imageRepository, put together a Mono that stores the image in MongoDB, using UUID to create a unique key and the filename.
  • Using FilePart, WebFlux's reactive multipart API, build another Mono that copies the file to the server.
  • To ensure both of these operations are completed, join them together using Mono.when(). This means that each file won't be completed until the record is written to MongoDB and the file is copied to the server.
  • The entire flow is terminated with then() so we can signal when all the files have been processed.
Ever worked with promises? They are quite popular in the JavaScript world. Project Reactor's Mono.when() is akin to the A+ Promise spec's promise.all() API, that waits until all sub-promises are completed before moving forward. Project Reactor can be thought of as promises on steroids with many more operations available. In this case, by stringing several operations together using then(), you can avoid callback hell while ensuring the flow of how things unfold.

On a fundamental level, we need creating an image to involve two things--copying the file's contents to the server, and writing a record of it in MongoDB. That is on par with what we've declared in the code by using Mono.when() to combine two separate actions.

imageRepository.save() is already a reactive operation, so we can capture it straight up as a Mono. Because MultipartFile is, inherently, tied to the blocking servlet paradigm, WebFlux has a new interface, FilePart, meant to handle file uploads reactively. Its transferTo() API returns a Mono<Void> letting us signal when to carry out the transfer.

Is this a transaction? Certainly not an ACID-style one (Atomic, Consistent, Isolated, Durable) traditionally found with relational data stores. Those types of transactions have a long history of not scaling well. When more clients try to alter the same rows of data, traditional transactions block with increasing frequency. And blocking in, and of itself, is not congruent with reactive programming.

However, semantically, perhaps we are engaged in a transaction. After all, we are saying that both of these actions must complete from a Reactive Streams perspective before the given FilePart is considered to be processed in the middle of the Flux. Given the long history of assumptions made regarding transactions, it might be best to leave that term behind, and refer to this as a reactive promise.

While it's possible to inline both the saveDatabaseImage operation and the copyFile operation inside the Mono.when(), they were pulled out as separate variables for readability. The more flows you write, the more you may be tempted to streamline things in a single, chained statement. If you're feeling lucky, go for it!

When it comes to order of processing, which goes first? Saving the document in MongoDB, or storing the file on the server? It's actually not specified in the API. All that is declared is that both of these operations must be completed to move on, and Reactor guarantees that if any asynchronous threading is being used, the framework will handle any and all coordination.

This is why Mono.when() is the perfect construct when two or more tasks need to be completed, and the order doesn't matter. The first time the code is run, perhaps, MongoDB is able to store the record first. It's quite possible that the next time this code is exercised, MongoDB may be slightly delayed due to external factors such as responding to another operation, hence allowing the file to be copied first. And the time after that, other factors may cause the order to swap. But the key point of this construct is to ensure that we use resources with maximum efficiency while still having a consistent result--both are completed before moving on.

Notice how we used flatMap to turn each file into a promise to both copy the file and save a MongoDB record? flatMap is kind of like map and then, but on steroids. map has a signature of map(T → V) : V, while flatMap has flatMap(T → Publisher<V>) : V, meaning, it can unwrap the Mono and produce the contained value. If you're writing a reactive flow that isn't clicking, check if one of your map or then calls needs to be replaced with a flatMap.

If we wanted a certain order to happen, the best construct would be Mono.then(). We can chain multiple then calls together, ensuring that a certain uniform state is achieved at each step before moving forward.

Let's wrap up this section by making adjustments to deleteImage as follows:

    public Mono<Void> deleteImage(String filename) { 
      Mono<Void> deleteDatabaseImage = imageRepository 
       .findByName(filename) 
       .flatMap(imageRepository::delete); 
 
      Mono<Void> deleteFile = Mono.fromRunnable(() -> { 
        try { 
          Files.deleteIfExists( 
            Paths.get(UPLOAD_ROOT, filename)); 
        } catch (IOException e) { 
            throw new RuntimeException(e); 
        } 
      }); 
 
      return Mono.when(deleteDatabaseImage, deleteFile) 
       .then(); 
    } 

The previous code can be explained as follows:

  • First we create a Mono to delete the MongoDB image record. It uses imageRepository to first findByName, and then it uses a Java 8 method handle to invoke imageRepository.delete.
  • Next, we create a Mono using Mono.fromRunnable to delete the file using Files.deleteIfExists. This delays deletion until Mono is invoked.
  • To have both of these operations completed together, we join them with Mono.when().
  • Since we're not interested in the results, we append a then(), which will be completed when the combined Mono is done.

We repeat the same coding pattern as createImage() where we collect operations into multiple Mono definitions, and wrap them with a Mono.when(). This is the promise pattern, and when coding reactively, we'll use it often.

Traditionally, Runnable objects are started in some multithreaded fashion, and are meant to run in the background. In this situation, Reactor is in full control of how it gets started through the use of its scheduler. Reactor is also able to ensure that the reactive streams complete signal is issued when the Runnable object is done with its work.

At the end of the day, that is the whole point of these various operations from Project Reactor. We declare the desired state, and offload all the work scheduling and thread management to the framework. We use a toolkit that is designed from the ground up to support asynchronous, non-blocking operations for maximum resource usage. This gives us a consistent, cohesive way to define expected results while getting maximum efficiency.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.234.188