Wrapping reactive transactions with the usingWhen factory

Similarly to the using operator, the usingWhen operator allows us manage resources in the reactive way. However, the using operator retrieves the managed resource synchronously (by calling the Callable instance). At the same time, the usingWhen operator retrieves the managed resource reactively (by subscribing to the instance of Publisher). Additionally, the usingWhen operator accepts different handlers for the successful and unsuccessful termination of the main processing stream. Those handlers are implemented by publishers. That distinction allows the implementation of completely non-blocking reactive transactions with only one operator. 

Let's assume we have an entirely reactive transaction. For demonstration purposes, the code is oversimplified. A reactive transaction implementation may look like the following:

public class Transaction {
    private static final Random random = new Random();
    private final int id;

    public Transaction(int id) {
        this.id = id;
        log.info("[T: {}] created", id);
    }

    public static Mono<Transaction> beginTransaction() {             // (1)
        return Mono.defer(() ->
            Mono.just(new Transaction(random.nextInt(1000))));
    }

    public Flux<String> insertRows(Publisher<String> rows) {         // (2)
        return Flux.from(rows)
            .delayElements(Duration.ofMillis(100))
            .flatMap(r -> {
                if (random.nextInt(10) < 2) {
                    return Mono.error(new RuntimeException("Error: " + r));
                } else {
                    return Mono.just(r);
                }
            });
    }

    public Mono<Void> commit() {                                     // (3)
        return Mono.defer(() -> {
            log.info("[T: {}] commit", id);
            if (random.nextBoolean()) {
                return Mono.empty();
            } else {
                return Mono.error(new RuntimeException("Conflict"));
            }
        });
    }

    public Mono<Void> rollback() {                                   // (4)
        return Mono.defer(() -> {
            log.info("[T: {}] rollback", id);
            if (random.nextBoolean()) {
                return Mono.empty();
            } else {
                return Mono.error(new RuntimeException("Conn error"));
            }
        });
    }
}

Let's look at the preceding code:

  1. This is a static factory that allows the creation of new transactions.
  2. Each transaction has a method for saving new rows within the transaction. Sometimes, the process fails due to some internal problems (random behavior). insertRows consumes and returns Reactive Streams.
  3. This is an asynchronous commit. Sometimes, a transaction may fail to commit.
  4. This is an asynchronous rollback. Sometimes, a transaction may fail to roll back.

Now, with the usingWhen operator, we can implement a transaction updated with the following code:

Flux.usingWhen(
    Transaction.beginTransaction(),                                  // (1)
    transaction -> transaction.insertRows(Flux.just("A", "B", "C")), // (2)
    Transaction::commit,                                             // (3)
    Transaction::rollback                                            // (4)
).subscribe(
    d -> log.info("onNext: {}", d),
    e -> log.info("onError: {}", e.getMessage()),
    () -> log.info("onComplete")
);

The preceding code uses the usingWhen operator for the following:

  1. Here, the beginTransaction static method returns a new transaction asynchronously by returning the Mono<Transaction> type
  2. With a given transaction instance, it tries to insert new rows
  3. To commit the transaction if step (2) finished successfully
  4. To roll back the transaction if step (2) failed

After executing the code in our exercise, we should see the following output for a successful execution:

[T: 265] created
onNext: A
onNext: B
onNext: C
[T: 265] commit
onComplete

An example of execution with an aborted transaction may look as follows:

[T: 582] created
onNext: A
[T: 582] rollback
onError: Error: B

With the usingWhen operator, it is much easier to manage the resource life cycle in an entirely reactive way. Also, reactive transactions can easily be implemented with it. So, the usingWhen operator is an enormous improvement when compared with the using operator.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.37.154