Using asynchronous drivers (Cassandra)

We have described how the reactive Mongo repository is built on top of the reactive driver. Now, let's look at how the reactive Cassandra repository adapts the asynchronous driver.

Similarly to ReactiveMongoRepository, reactive Casandra connector gives us the ReactiveCassandraRepository interface, which also extends the more generic ReactiveCrudRepository. The ReactiveCassandraRepository interface is implemented by SimpleReactiveCassandraRepository, which, in turn, uses the ReactiveCassandraOperations interface for low-level operations. ReactiveCassandraOperations is implemented by the ReactiveCassandraTemplate class. Of course, ReactiveCassandraTemplate may be used directly in the application, similar to ReactiveMongoTemplate.

The ReactiveCassandraTemplate class internally uses ReactiveCqlOperations. ReactiveCassandraTemplate operates with Spring Data entities such as org.springframework.data.cassandra.core.query.Query, while ReactiveCqlOperations operates with CQL statements (represented by String) recognizable by the Cassandra driver. The ReactiveCqlOperations interface is implemented by the ReactiveCqlTemplate class. In turn, ReactiveCqlTemplate uses the ReactiveSession interface for actual database querying. ReactiveSession is implemented by the DefaultBridgedReactiveSession class, which bridges asynchronous Session methods, provided by the driver, to reactive execution patterns.

Let's go deeper and look at how the DefaultBridgedReactiveSession class adapts the asynchronous API into the reactive API. The execute method receives a Statement (for example, a SELECT statement) and reactively returns results. The execute method and its adaptFuture helper method look as follows:

public Mono<ReactiveResultSet> execute(Statement statement) {      // (1)
  return Mono.create(sink -> {                                     // (2)
    try {
      ListenableFuture<ResultSet> future = this.session            // (3)
          .executeAsync(statement);
      ListenableFuture<ReactiveResultSet> resultSetFuture =
          Futures.transform(                                       // (4)
            future, DefaultReactiveResultSet::new);
      adaptFuture(resultSetFuture, sink);                          // (5)
    } catch (Exception cause) {
      sink.error(cause);                                           // (6)
    }
 });
}

<T> void adaptFuture(                                              // (7)
      ListenableFuture<T> future, MonoSink<T> sink
) {
  future.addListener(() -> {                                       // (7.1)
    if (future.isDone()) {
      try {
        sink.success(future.get());                                // (7.2)
      } catch (Exception cause) {
        sink.error(cause);                                         // (7.3)
      }
    }
  }, Runnable::run);
}

Let's go through the preceding code:

  1. First of all, the execute method does not return a Flux with results, but rather a Mono with an instance of ReactiveResultSet. ReactiveResultSet wraps asynchronous com.datastax.driver.core.ResultSetwhich supports pagination so that the first page of the result is fetched when a ResultSet instance is returned and the next page fetched only after all the results of the first one have been consumed. ReactiveResultSet adapts that behavior with the following method signature—Flux<Row> rows().
  2. We create a new Mono instance with the create method, which defers operations to the moment of subscription.
  3. This is an asynchronous query execution on a driver's asynchronous Session instance. Note that the Cassandra driver used Guava's ListenableFuture for returning results.
  4. The asynchronous ResultSet is wrapped into a reactive counterpart called ReactiveResultSet.
  5. Here, we're calling the adaptFuture helper method, which maps ListenableFuture to Mono.
  6. If there are any errors, we have to inform our reactive subscriber.
  7. The adaptFuture method simply adds a new listener to the future (7.1), so when a result appears, it generates a reactive onNext signal (7.2). It also informs the subscriber about execution errors, if any (7.3).

It is important to note that the multi-page ResultSet allows calling the fetchMoreResults method to get the subsequent page of data asynchronously. ReactiveResultSet is doing that internally inside the Flux<Row> rows() method. Even though this approach works, it is considered an intermediate solution until Casandra receives an entirely reactive driver.

The following diagram shows the internal architecture of the reactive Spring Data Cassandra module:

Diagram 7.17 Reactive Cassandra stack with Spring Data
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.120.136