Handling errors

When we design a reactive application that communicates a lot with external services, we have to deal with all kinds of exceptional situations. Fortunately, the onError signal is an integral part of the Reactive Stream specification, so an exception should always have a way to propagate to the actor who can handle it. However, if the final subscriber does not define a handler for the onError signal, onError throws an UnsupportedOperationException.

Also, the semantics of Reactive Streams define that onError is a terminal operation, after which the reactive sequence stops executions. At that point, we may react differently by applying one of the following strategies:

  • Of course, we should define handlers for the onError signal in the subscribe operator.
  • We can catch and replace an error with a default static value or a value calculated from the exception by applying the onErrorReturn operator.
  • We can catch an exception and execute an alternative workflow by applying the onErrorResume operator.
  • We can catch and transform an exception into another exception that better represents the situation by applying the onErrorMap operator.
  • We can define a reactive workflow that, in the event of errors, retries the execution. The retry operator resubscribes to the source reactive sequence if it signals an error. It may behave so indefinitely or for a limited amount of time. The retryBackoff operator gives out-of-the-box support for the exponential backoff algorithm, which retries the operation with increasing delays.

Also, an empty stream is not what we always want. In that case, we may return a default value with the defaultIfEmpty operator or an entirely different reactive stream with the switchIfEmpty operator.

One more handy operator, called timeout, allows the limiting of the operation waiting time and the throwing of a TimeoutException exception, which, in turn, we can process with some other error handling strategy.

Let's demonstrate how we can apply some of the strategies described. We can assume the following unreliable recommendation service:

public Flux<String> recommendedBooks(String userId) {
return Flux.defer(() -> { // (1)
if (random.nextInt(10) < 7) {
return Flux.<String>error(new RuntimeException("Err")) // (2)
.delaySequence(Duration.ofMillis(100));
} else {
return Flux.just("Blue Mars", "The Expanse") // (3)
.delayElements(Duration.ofMillis(50));
}
}).doOnSubscribe(s -> log.info("Request for {}", userId)); // (4)
}

Let's look at the preceding code:

  1. We defer the calculations until a subscriber arrives.
  2. It's highly likely that our unreliable service will return an error. However, we shift all signals in time by applying the delaySequence operator.
  3. When a client is lucky, they receive their recommendations with some delays.
  4. Also, we log each request to the service.

Now, let's implement a client that handles our unreliable service well:

Flux.just("user-1")                                                // (1)
.flatMap(user -> // (2)
recommendedBooks(user) // (2.1)
.retryBackoff(5, Duration.ofMillis(100)) // (2.2)
.timeout(Duration.ofSeconds(3)) // (2.3)
.onErrorResume(e -> Flux.just("The Martian"))) // (2.4)
.subscribe( // (3)
b -> log.info("onNext: {}", b),
e -> log.warn("onError: {}", e.getMessage()),
() -> log.info("onComplete")
);

The preceding code does the following:

  1. Here, we generate a stream of users who request their movie recommendations.
  2. For each user, we call our unreliable recommendedBooks service (2.1). If the call fails, we retry with exponential backoff (no more than 5 retries, starting with a duration of 100 milliseconds) (2.2). However, if our retry strategy does not bring any results after three seconds, it causes an error signal (2.3). Finally, in the event of any errors, we return a predefined universal set of recommendations with the onErrorResume operator (2.4).
  3. Of course, we need to create a subscriber.

When running, our application may generate the following output:

[time: 18:49:29.543] Request for user-1
[time: 18:49:29.693] Request for user-1
[time: 18:49:29.881] Request for user-1
[time: 18:49:30.173] Request for user-1
[time: 18:49:30.972] Request for user-1
[time: 18:49:32.529] onNext: The Martian
[time: 18:49:32.529] onComplete

From the logs, we can see that our core tried to get recommendations for user-1 five times. Also, a retry delay was increased from ~150 milliseconds to ~1.5 seconds. Finally, our code stopped trying to retrieve a result from the recommendedBooks method and returned the (The Martian) fallback value, and completed the stream.

To summarize, Project Reactor provides a wide range of instruments that help with the handling of exceptional situations and, consequently, improve an application's resilience.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.124.21