Reactive transactions with MongoDB 4

Starting with version 4.0, MongoDB supports multi-document transactions. This allows us to experiment with reactive transactions in the new version of MongoDB. Previously, Spring Data only had reactive connectors to databases that did not support transactions. Now, the situation has changed. As reactive transactions are a novelty in the reactive persistence area, which itself is pretty new, all the following claims and code examples should be treated as some of the possibilities of how reactive transactions could be implemented in the future. At the time of writing, Spring Data does not have any features that apply reactive transactions at the service or repository levels. However, we may operate with transactions at the level of ReactiveMongoOperations implemented by ReactiveMongoTemplate.

First of all, the multi-document transaction is a new feature for MongoDB. It works only for a non-sharded replica set with the WiredTiger storage engine. No other configuration supports multi-document transactions as of MongoDB 4.0.

Also, some MongoDB features are not available inside a transaction. It is not allowed to issue meta commands, or create collections or indices. Also, the implicit creation of collections does not work inside a transaction. So, it is required to set up required database structures to prevent errors. Besides, some commands may behave differently, so please check the documentation regarding multi-document transactions.

Previously, MongoDB allowed atomic updates only for one document, even if that document contained embedded documents. With multi-document transactions, it is possible to get all-or-nothing semantics across many operations, documents, and collections. Multi-document transactions guarantee a globally consistent view of data. When a transaction is committed, all the changes made within the transaction are saved. However, when any action within the transaction fails, the whole transaction aborts and all the changes are discarded. Also, no data updates are visible outside the transaction until the transaction is committed.

Now, let's demonstrate that reactive transactions may be used for storing documents to MongoDB. For that purpose, we may use a well-known classical example. Let's say we have to implement a wallet service that transfers money between the users' accounts. Each user has their own account with a non-negative balance. A user may transfer any arbitrary sum to another user only when they have enough funds. Transfers may happen in parallel, but the system is not allowed to lose or gain any money while doing transfers. Consequently, a withdraw operation for sender's wallet and a deposit operation for receiver's wallet has to happen simultaneously and atomically. Multi-document transactions should help here.

Without transactions we may face the following issues:

  • A client makes a few transfers at the same time with more funds than they have on their account. Hence, there is a chance that simultaneous transfers may impact the consistency of the system and illegally create money.
  • A user receives a few deposits simultaneously. Some updates may rewrite the wallet state, and the user may lose money forever.

There are a few approaches that may describe the algorithm of money transfer, but here we stick with the most simple one. To transfer an amount of money from account A to account B, we should do the following:

  1. Open a new transaction.
  2. Load the wallet of account A.
  3. Load the wallet of account B.
  4. Check that the wallet of account A has sufficient funds.
  5. Calculate the new balance of account A by withdrawing the amount.
  6. Calculate the new balance of account B by depositing the amount.
  7. Save the wallet of account A.
  8. Save the wallet of account B.
  9. Commit the transaction.

As a result of that algorithm, we would either get the new consistent state of the wallets or see no changes at all.

Let's describe the Wallet entity class that is mapped to a MongoDB document and has some handy utility methods:

@Document(collection = "wallet")                                     // (1)
public class Wallet {
   @Id private ObjectId id;                                          // (2)
   private String owner;
   private int balance;

   // Constructors, getters, and setters are omitted...

   public boolean hasEnoughFunds(int amount) {                       // (3)
      return balance >= amount;
   }

   public void withdraw(int amount) {                                // (4)
      if (!hasEnoughFunds(amount)) {
         throw new IllegalStateException("Not enough funds!");
      }
      this.balance = this.balance - amount;
   }

   public void deposit(int amount) {                                 // (5)
      this.balance = this.balance + amount;
   }
}

Let's describe the preceding code:

  1. The Wallet entity class is mapped to the wallet collection in MongoDB.
  2. The org.bson.types.ObjectId class is used as an entity identifier. The ObjectId class has great integration with MongoDB and is often used for entity identification.
  3. The hasEnoughFunds method checks whether the wallet has enough funds for an operation.
  4. The withdraw method decreases the wallet's balance by the requested amount.
  5. The deposit method increases the wallet's balance by the requested amount.

To store and load wallets from a database, we need a repository:

public interface WalletRepository
       extends ReactiveMongoRepository<Wallet, ObjectId> {          // (1)
   Mono<Wallet> findByOwner(Mono<String> owner);                    // (2)
}

Let's describe the WalletRepository interface in more detail:

  1. Our WalletRepository interface extends the ReactiveMongoRepository interface.
  2. Also, we define an additional method called findByOwner to retrieve a wallet if we have the owner's name. The generated implementation of the interface knows how to execute an actual query as the findByOwner method follows the Spring Data naming convention.

Now, let's define an interface for WalletService:

public interface WalletService {

   Mono<TxResult> transferMoney(                                     // (1)
      Mono<String> fromOwner,
      Mono<String> toOwner,
      Mono<Integer> amount);

   Mono<Statistics> reportAllWallets();                              // (2)
   enum TxResult {                                                   // (3)
      SUCCESS,
      NOT_ENOUGH_FUNDS,
      TX_CONFLICT
   }
   class Statistics {                                                // (4)
      // Implementation is omitted ...
   }
}

Here, the numbered points mean the following:

  1. The transferMoney method transfers the amount of money from the wallet of fromOwner to the wallet of toOwner. Note that the method consumes reactive types, so at the moment of the method call, the actual transaction participants may still be unknown. Of course, the method could equally accept primitives or Mono<MoneyTransferRequest>. However, here, we intentionally use three distinct Mono instances to exercise the zip operator and TupleUtils
  2. The reportAllWallets method sums up the data of all registered wallets and checks the total balance.
  3. The transferMoney method returns a result of the TxResult type. The TxResult enum describes three potential outcomes of a transfer operation: SUCCESS, NOT_ENOUGH_FUNDS, and TX_CONFLICT. The SUCCESS and NOT_ENOUGH_FUNDS operations are self-describing. TX_CONFLICT describes a situation when the transaction has failed because some other concurrent transactions succeed, updating one or both of the involved wallets.
  4. The Statistics class represents the aggregated state of all wallets in the system, which is useful for integrity checks. The implementation details are omitted for the sake of simplicity.

Now that we have the WalletService interface defined we can write a unit test with a simulation. With the desired parallelism, the simulation chooses two random owners and tries to transfer a random amount of money. With some insignificant parts omitted, the simulation may look as follows:

public Mono<OperationStats> runSimulation() {
   return Flux.range(0, iterations)                                  // (1)
      .flatMap(i -> Mono
         .delay(Duration.ofMillis(rnd.nextInt(10)))                  // (2)
         .publishOn(simulationScheduler)                             // (3)
         .flatMap(_i -> {
            String fromOwner = randomOwner();                        // (4)
            String toOwner = randomOwnerExcept(fromOwner);           //
            int amount = randomTransferAmount();                     //
            return walletService.transferMoney(                      // (5)
               Mono.just(fromOwner),
               Mono.just(toOwner),
               Mono.just(amount));
         }))
      .reduce(                                                       // (6)
         OperationStats.start(),
         OperationStats::countTxResult);
}

The preceding code consists of the following steps:

  1. Use the Flux.range method to simulate the desired amount of transfer iterations.
  2. Apply a small random delay in order to stimulate random transaction contentions.
  3. Transactions are running on simulationScheduler. Its parallelism defines how many concurrent transactions may happen. We may create a schedule with this code—Schedulers.newParallel("name", parallelism).
  4. Select random wallet owners and the amount of money to be transferred.
  5. Make the transferMoney service request.
  6. As the transferMoney call may result in one of TxResult states, the reduce method helps to track the simulation statistics. Note that the OperationStats class tracks how many operations were successful, how many were rejected due to insufficient funds, and how many failed due to transaction conflicts. On the other hand, the WalletService.Statistics class tracks the total amount of funds.

With a correct implementation of WalletService, we expect that a test simulation leads to a system state where the total amount of money in the system does not change. At the same time, we expect that money transfer requests are successfully executed when a sender has enough funds for a transaction. Otherwise, we may face system integrity issues that may lead to an actual financial loss.

Now, let's implement the WalletService service using reactive transaction support provided by MongoDB 4 and Spring Data. An implementation represented by the TransactionalWalletService class may look like this:

public class TransactionalWalletService implements WalletService {
   private final ReactiveMongoTemplate mongoTemplate;             // (1)

   @Override
   public Mono<TxResult> transferMoney(                           // (2)
      Mono<String> fromOwner,                                     //
      Mono<String> toOwner,                                       //
      Mono<Integer> requestAmount                                 //
   ) {                                                            //
      return Mono.zip(fromOwner, toOwner, requestAmount)          // (2.1)
         .flatMap(function((from, to, amount) -> {                // (2.2)
            return doTransferMoney(from, to, amount)              // (2.3)
               .retryBackoff(                                     // (2.4)
                  20, Duration.ofMillis(1),                       //
                  Duration.ofMillis(500), 0.1                     //
               )                                                  //
               .onErrorReturn(TxResult.c);                        // (2.5)
         }));
   }

   private Mono<TxResult> doTransferMoney(                        // (3)
      String from, String to, Integer amount                      // (3.1)
   ) {                                                            //
      return mongoTemplate.inTransaction().execute(session ->     // (3.2)
         session                                                  //
            .findOne(queryForOwner(from), Wallet.class)           // (3.3)
            .flatMap(fromWallet -> session                        //
               .findOne(queryForOwner(to), Wallet.class)          // (3.4)
               .flatMap(toWallet -> {                             //
                  if (fromWallet.hasEnoughFunds(amount)) {        // (3.5)
                     fromWallet.withdraw(amount);                 // (3.6)
                     toWallet.deposit(amount);                    // (3.7)
                     return session.save(fromWallet)              // (3.8)
                        .then(session.save(toWallet))             // (3.9)
                        .then(Mono.just(TxResult.SUCCESS));       // (3.10)
                  } else {                                        //
                     return Mono.just(TxResult.NOT_ENOUGH_FUNDS); // (3.11)
                  }                                               //
               })))                                               //
         .onErrorResume(e ->                                      // (3.12)
             Mono.error(new RuntimeException("Conflict")))        //
         .last();                                                 // (3.13)
   }

   private Query queryForOwner(String owner) {                    // (4)
      return Query.query(new Criteria("owner").is(owner));        // (4.1)
   }
}

As the preceding code may seem to be non-trivial, let's describe it piece by piece:

  1. First of all, we have to use the ReactiveMongoTemplate class, because, at the time of writing, Reactive MongoDB connector does not support transactions at the level of repositories, only at the level of the MongoDB template.
  2. The implementation of the transferMoney method is defined here. With the zip operation, it subscribes to all method arguments (2.1), and when all the arguments are resolved, it uses the TupleUtils.function static helper function in order to decompose Tuple3<String, String, Integer> into its constituents (2.2) for code fluency. At point (2.3) we call the doTransferMoney method, which does the actual money transfer. However, the doTransferMoney method may return the onError signal that indicates a transaction conflict. In that case, we can retry the operation with the convenient retryBackoff method (2.4). The retryBackoff method needs to know the number of retries (20), the initial retry delay (1 millisecond), the maximum retry delay (500 milliseconds), and the jitter value (0.1), which configures how fast the retry delay increases. In cases when we fail to process the transaction even after all retries, we should return the TX_CONFLICT status to the client.
  3. The doTransferMoney method tries to make an actual money transfer. It is called with the already resolved arguments—form, to, and amount (3.1). By calling the mongoTemplate.inTransaction().execute(...) method, we define the boundaries of a new transaction (3.2). Inside the execute method, we are given the session instance of the ReactiveMongoOperations class. The session object is bound to a MongoDB transaction. Now, within the transaction, we search for the wallet of the sender (3.3) and then search for the wallet of the receiver (3.4). With both wallets resolved, we check whether the sender has enough funds (3.5). Then we withdraw the correct amount of money from the sender's wallet (3.6) and deposit the same amount of money in the receiver's wallet (3.7). At this point, the changes are not saved to the database yet. Now, we save the updated wallet of the sender (3.8) and then the updated wallet of the receiver (3.9). If the database does not reject the changes, we return the SUCCESS status and automatically commit the transaction (3.10). If the sender does not have enough funds, we return the NOT_ENOUGH_FUNDS status (3.11). If there are any errors while communicating with the database, we propagate the onError signal (3.12), which in turn should trigger the retry logic described at point (2.4).
  4. At points (3.3) and (3.4) we used the queryForOwner method, which uses the Criteria API to build MongoDB queries.

Referencing the correct session with the transaction is implemented using the Reactor Context. The ReactiveMongoTemplate.inTransaction method starts a new transaction and puts it into the context. Consequently, it is possible to get the session with the transaction represented by the com.mongodb.reactivestreams.client.ClientSession interface anywhere within the reactive flow. The ReactiveMongoContext.getSession() helper method allows us to get the session instance.

Of course, we may improve the TransactionalWalletService class by loading both wallets in one query as well as updating both wallets in one query. Changes such as these should decrease the number of database requests, speed up money transfers, and also decrease the rate of transaction conflicts. However, these improvements are left as an exercise for the reader.

Now, we may run the previously described test scenario with a different number of wallets, money transfer iterations, and parallelism. If we implemented all the business logic in the TransactionalWalletService class correctly, we should receive the output of the test as follows:

The number of accounts in the system: 500
The total balance of the system before the simulation: 500,000$
Running the money transferring simulation (10,000 iterations)
...
The simulation is finished
Transfer operations statistic:
  - successful transfer operations: 6,238
  - not enough funds: 3,762
  - conflicts: 0
All wallet operations:
  - total withdraw operations: 6,238
  - total deposit operations: 6,238
The total balance of the system after the simulation: 500,000$

So, in the preceding simulation, we performed 10,000 transfer operations, 6,238 of which succeeded, and 3,762 of which failed due to insufficient funds. Also, our retry strategy allowed to resolve all transaction conflicts as none of the transactions were finished with the TX_CONFLICT status. As is evident from the logs, the system preserved the invariance of the total balance—the total amount of money in the system before and after the simulation is the same. Consequently, we have achieved system integrity while doing concurrent money transfers by applying reactive transactions with MongoDB.

The support for multi-document transactions for replica sets now allows the whole new set of applications to be implemented with MongoDB as primary data storage. Of course, future versions of MongoDB may allow transactions across sharded deployments and provide various isolation levels for dealing with transactions. However, we should note that multi-document transactions incur greater performance costs and longer response latencies compared with simple document writes.

Even though reactive transactions is not a widely used technique yet, these examples clearly show that it is possible to apply transactions in the reactive fashion. Reactive transactions will be in high demand when applying reactive persistence to relational databases such as PostgreSQL. However, that topic requires a reactive language-level API for database access, which, at the time of writing, is not present yet.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.210.102