Using MongoDB reactive repository

To use the reactive data access for MongoDB instead of its synchronous counterpart, we have to add the following dependency to our Gradle project:

compile 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'

Let's say we want to refactor our simple MongoDB application from the previous section to become reactive. In this case, we may leave the Book entity as is, without making any modifications. All the annotations associated with MongoDB Object-Document Mapping are the same for both synchronous and reactive MongoDB modules. However, in the repository, we now have to replace ordinary types with reactive types:

public interface ReactiveSpringDataMongoBookRepository
   extends ReactiveMongoRepository<Book, Integer> {                  // (1)

   @Meta(maxScanDocuments = 3)                                       // (2)
   Flux<Book> findByAuthorsOrderByPublishingYearDesc(                // (3)
       Flux<String> authors
   );

   @Query("{ 'authors.1': { $exists: true } }")                      // (4)
   Flux<Book> booksWithFewAuthors();
}

So, our repository now extends the ReactiveMongoRepository interface (1) instead of MongoRepository. In turn, ReactiveMongoRepository extends the ReactiveCrudRepository interface, the common interface for all reactive connectors.

While there is no RxJava2MongoRepository, we can still use all reactive Spring Data repositories with RxJava2 by just extending from RxJava2CrudRepository. Spring Data handles the adoption of Project Reactor types to RxJava2 and vice versa so to provide a native RxJava 2 experience.

The ReactiveCrudRepository interface is the reactive equivalent of the CrudRepository interface from synchronous Spring Data. Reactive Spring Data Repositories use the same annotations and support the majority of synchronously provided features. So, the reactive Mongo repository supports queries by the method name convention (3), the @Query annotation with hand-written MongoDB queries (4), and the @Meta annotation with some addition query-tuning abilities (2). It also supports constructs for running Query by Example (QBE) requests. However, in contrast to the synchronous MongoRepository, ReactiveMongoRepository extends the ReactiveSortingRepository interface, which provides the ability to request the specific order of results but does not provide pagination support. The question of data pagination is covered in the Pagination support section.

As usual, we may inject a bean of the ReactiveSpringDataMongoBookRepository type in our application and Spring Data would then provide the desired bean. The following code shows how to insert a few books into MongoDB using a reactive repository:

@Autowired
private ReactiveSpringDataMongoBookRepository rxBookRepository;      // (1)
...
Flux<Book> books = Flux.just(                                        // (2)
   new Book("The Martian", 2011, "Andy Weir"),
   new Book("Blue Mars", 1996, "Kim Stanley Robinson")
);

rxBookRepository
   .saveAll(books)                                                   // (3)
   .then()                                                           // (4)
   .doOnSuccess(ignore -> log.info("Books saved in DB"))             // (5)
   .subscribe();                                                     // (6)

Let's understand what the preceding code is doing here:

  1. Injecting a bean with the BookSpringDataMongoRxRepository interface.
  2. Preparing a reactive stream with Book that has to be inserted into the database.
  3. Saving entities using the saveAll method that consumes a Publisher<Book>. As usual, no saving happens until an actual subscriber subscribes. ReactiveCrudRepository also has the saveAll method override that consumes the Iterable interface. These two methods have different semantics, but we are going to cover this topic later.
  1. The saveAll method returns a Flux<Book> with saved entities, but as we are not interested in that level of detail, with the then method, we transform a stream in such a way that only onComplete or onError events are propagated.
  2. We report a corresponding log message when the reactive stream is complete and all books are saved.
  3. As always, with a reactive stream, there should be a subscriber. Here, for the sake of simplicity, we subscribe without any handlers. However, in a real application, there should be real subscribers, such as a subscription from a WebFlux exchange that processes a response.

Now, let's query MongoDB using Reactive Streams. To print query results that flow through a reactive stream, we can use the following convenient helper method:

private void reportResults(String message, Flux<Book> books) {     // (1)
   books
      .map(Book::toString)                                         // (2)
      .reduce(                                                     // (3)
         new StringBuilder(),                                      // (3.1)
         (sb, b) -> sb.append(" - ")                               // (3.2)
            .append(b)
            .append("
"))
      .doOnNext(sb -> log.info(message + "
{}", sb))              // (5)
      .subscribe();                                                // (6)
}

Let's understand what the preceding code is doing here:

  1. This is a method that prints a human-readable list of books as a one log message with the desired message prefix.
  2. For each book in the stream, it calls its toString method and propagates its string representation.
  3. The Flux.reduce method is used to collect all book representations into one message. Note that this approach may not work if the amount of books is significant, as each new book increases the size of the stored buffer and may cause high memory consumption. To store intermediate results, we use the StringBuilder class (3.1). Keep in mind that StringBuilder is not thread-safe and the onNext method may call different threads, but the Reactive Streams Specification guarantees the happens-before relation. So, even if different threads push different entities, it is safe to work to concatenate them together with StringBuilder as memory barriers guarantee the latest state of the StringBuilder object while it is updated inside one reactive stream. At point (3.2) a book representation appends to the single buffer.
  1. As the reduce method emits its onNext event only after processing of all incoming onNext events, we are safe to log the final message with all books.
  2. To start processing, we have to subscribe. For the sake of simplicity, we assume that no errors are possible here. However, in production code, there should be some logic for handling errors.

Now, let's read and report all the books in the database:

Flux<Book> allBooks = rxBookRepository.findAll();
reportResults("All books in DB:", allBooks);

The following code searches for all books by Andy Weir using the method naming convention:

Flux<Book> andyWeirBooks = rxBookRepository
   .findByAuthorsOrderByPublishingYearDesc(Mono.just("Andy Weir"));
reportResults("All books by Andy Weir:", andyWeirBooks);

Furthermore, the preceding code passes the search criteria using the Mono<String> type and starts an actual database query only when that Mono produces an onNext event. So, the reactive repository becomes a natural part of a reactive stream, where incoming and outgoing streams are reactive.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.190.175