Wrapping a synchronous CrudRepository

Sometimes we may already have a CrudRepository instance with all the required mechanics for data access (no manual queries or entity mappings needed). However, we cannot directly use this in a reactive application. In this case, it is easy to write our own reactive adapter that would behave similarly to the rxjava2-jdbc library but at the repository level. Be cautious with JPA when applying this approach. We can quickly run into proxy issues when using lazy loading. So, let's assume that we have the following Book entity defined by JPA:

@Entity
@Table(name = "book")
public class Book {
   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Integer id;
   private String title;
   private Integer publishingYear;
   // Constructors, getters, setters...
}

Also, we have the following Spring Data JPA repository:

@Repository
public interface BookJpaRepository
   extends CrudRepository<Book, Integer> {                           // (1)

   Iterable<Book> findByIdBetween(int lower, int upper);             // (2)

   @Query("SELECT b FROM Book b WHERE " +
      "LENGTH(b.title)=(SELECT MIN(LENGTH(b2.title)) FROM Book b2)") // (3)
   Iterable<Book> findShortestTitle();
}

The BookJpaRepository has the following characteristics:

  1. It extends the CrudRepository interface and inherits all the methods for data access.
  2. The BookJpaRepository defines a method that generates a query based on the naming convention.
  3. The BookJpaRepository defines a method with a custom SQL.

The BookJpaRepository interface works fine with blocking JPA infrastructure. All the methods of the BookJpaRepository repository return non-reactive types. To wrap the BookJpaRepository interface into a reactive API and receive most of its capabilities, we may define an abstract adapter and extend it with additional methods to map the findByIdBetween and findShortestTitle method. The abstract adapter may be reused for adapting any CrudRepository instance. The adapter may look as follows:

public abstract class
   ReactiveCrudRepositoryAdapter
<T, ID, I extends CrudRepository<T, ID>> // (1) implements ReactiveCrudRepository<T, ID> { // protected final I delegate; // (2) protected final Scheduler scheduler; // // Constructor... @Override public <S extends T> Mono<S> save(S entity) { // (3) return Mono // .fromCallable(() -> delegate.save(entity)) // (3.1) .subscribeOn(scheduler); // (3.2) } @Override public Mono<T> findById(Publisher<ID> id) { // (4) return Mono.from(id) // (4.1) .flatMap(actualId -> // (4.2) delegate.findById(actualId) // (4.3) .map(Mono::just) // (4.4) .orElseGet(Mono::empty)) // (4.5)
.subscribeOn(scheduler); // (4.6) } @Override public Mono<Void> deleteAll(Publisher<? extends T> entities) { // (5) return Flux.from(entities) // (5.1) .flatMap(entity -> Mono // .fromRunnable(() -> delegate.delete(entity)) // (5.2) .subscribeOn(scheduler)) // (5.3) .then(); // (5.4) } // All other methods of ReactiveCrudRepository... }

Let's describe the preceding code:

  1. ReactiveCrudRepositoryAdapter is an abstract class that implements the ReactiveCrudRepository interface and has the same generic types as the delegate repository.
  2. ReactiveCrudRepositoryAdapter uses the underlying delegate of the CrudRepository type. Furthermore, the adapter requires the Scheduler instance to offload requests from the event loop. The parallelism of the scheduler defines the number of concurrent requests, so it is natural to use the same number as we use for connection pool configuration. However, the best mapping is not always one to one. If the connection pool is used for other purposes, the number of available connections may be less than available threads, and some threads may be blocked while waiting for a connection (rxjava2-jdbc handles such a scenario better).
  3. Here is a reactive wrapper method for the blocking save method. The blocking call is wrapped into the Mono.fromCallable operator (3.1) and offloaded to the dedicated scheduler (3.2).
  4. Here is a reactive adapter for the findById method. At first, the method subscribes to the id stream (4.1). If the value arrives (4.2), the delegate instance is called (4.3). The CrudRepository.findById method returns Optional, so it is required to map the value to a Mono instance (4.4). In case of receiving an empty Optional, return empty Mono (4.5). Of course, the execution is offloaded to the dedicated scheduler.
  1. Here is a reactive adapter for the deleteAll method. As the deleteAll(Publisher<T> entities) and deleteAll(Iterator<T> entities) methods have different semantics, we cannot map one reactive call directly into one blocking call. For example, the stream of entities is endless, and consequently no items are ever deleted. So, the deleteAll method subscribes to entities (5.1) and issues a separate delegate.delete(T entity) request (5.2) for each of them. As a delete request may run in parallel, each request has its own subscribeOn call to receive a worker from the scheduler (5.3). The deleteAll method returns an output stream that completes when the incoming stream is terminated and all the delete operations are completed. All methods of the ReactiveCrudRepository interface should be mapped in this way.

Now, let's define the missing custom methods in the concrete reactive repository implementation:

public class RxBookRepository extends
   ReactiveCrudRepositoryAdapter<Book, Integer, BookJpaRepository> {

   public RxBookRepository(
      BookJpaRepository delegate,
      Scheduler scheduler
   ) {
      super(delegate, scheduler);
   }
   public Flux<Book> findByIdBetween(                              // (1)
      Publisher<Integer> lowerPublisher,                           //
      Publisher<Integer> upperPublisher                            //
   ) {                                                             //
      return Mono.zip(                                             // (1.1)
         Mono.from(lowerPublisher),                                //
         Mono.from(upperPublisher)                                 //
      ).flatMapMany(                                               //
         function((low, upp) ->                                    // (1.2)
            Flux                                                   //
               .fromIterable(delegate.findByIdBetween(low, upp))   // (1.3)
               .subscribeOn(scheduler)                             // (1.4)
         ))                                                        //
         .subscribeOn(scheduler);                                  // (1.5)
   }

   public Flux<Book> findShortestTitle() {                         // (2)
      return Mono.fromCallable(delegate::findShortestTitle)        // (2.1)
         .subscribeOn(scheduler)                                   // (2.2)
         .flatMapMany(Flux::fromIterable);                         // (2.3)
   }
}

The RxBookRepository class extends the abstract ReactiveCrudRepositoryAdapter class, references the BookJpaRepository and Scheduler instances, and defines the following methods:

  1. The findByIdBetween method receives two Reactive Streams and subscribes to them with the zip operation (1.1). When both values are ready (1.2), the corresponding method is called on the delegate instance (1.3) and the blocking execution is offloaded to the dedicated scheduler. However, it is also possible to offload the resolution of the lowerPublisher and upperPublisher stream so that the event loop would not spend resources there (1.5). Be careful with such an approach because it may fight for resources with actual database requests and reduce the throughput.
  2. The findShortestTitle method calls the corresponding method (2.1) on the dedicated scheduler (2.2) and maps Iterable to Flux (2.3).

Now, we may finally wrap blocking BookJpaRepository into reactive RxBookRepository with the following code:

Scheduler scheduler = Schedulers.newParallel("JPA", 10);
BookJpaRepository jpaRepository = getBlockingRepository(...);

RxBookRepository rxRepository =
   new RxBookRepository(jpaRepository, scheduler);

Flux<Book> books = rxRepository
   .findByIdBetween(Mono.just(17), Mono.just(22));

books
   .subscribe(b -> log.info("Book: {}", b));

Not all blocking features may be mapped so easily. For example, JPA lazy loading would most likely be broken with the described approach. Also, the support of transactions would require an additional effort similar to that in the rxjava2-jdbc library. Alternatively, we would need to wrap synchronous operations at the granularity where no transaction expands beyond one blocking call.

The approach described here does not magically transform the blocking request into a reactive non-blocking execution. Some threads that form the JPA scheduler will still be blocked. However, detailed monitoring of the scheduler and wise pool management should help to create an acceptable balance between the application's performance and resource usage.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.78.102