Handling exceptions of an asynchronous task via exceptionally()

User problem: Compute the total of an order. If something goes wrong, then throw an IllegalStateException.

The following screenshots exemplify how exceptions are propagated in an asynchronous pipeline; the code in rectangles is not executed when an exception occurs at the point:

The following screenshot shows the exceptions in thenApply() and thenAccept():

So, in supplyAsync() if an exception occurs, then none of the following callbacks will be called. Moreover, the future will be resolved with this exception. The same rule applies for each callback. If the exception occurs in the first thenApply(), then the following thenApply() and thenAccept() will not be called.

If our attempt to computing the total of order ends up in an IllegalStateException, then we can rely on the exceptionally() callback which gives us a chance to recover. This method takes a Function<Throwable,​? extends T> and returns a CompletionStage<T>, therefore, a CompletableFuture. Let's see it at work:

public static void fetchOrderTotalException() {

CompletableFuture<Integer> cfTotalOrder
= CompletableFuture.supplyAsync(() -> {

logger.info(() -> "Compute total: "
+ Thread.currentThread().getName());

int surrogate = new Random().nextInt(1000);
if (surrogate < 500) {
throw new IllegalStateException(
"Invoice service is not responding");
}

return 1000;
}).exceptionally(ex -> {
logger.severe(() -> "Exception: " + ex
+ " Thread: " + Thread.currentThread().getName());

return 0;
});

int result = cfTotalOrder.get();
logger.info(() -> "Total: " + result + " ");
}

In case of exception, the output will be as follows:

Compute total: ForkJoinPool.commonPool-worker-3
Exception: java.lang.IllegalStateException: Invoice service
is not responding Thread: ForkJoinPool.commonPool-worker-3
Total: 0

Let's take a look at another problem.

User problem: Fetch an invoice, compute the total, and sign. If something goes wrong then throw IllegalStateException and stop the process.

If we fetch the invoice using supplyAsync(), compute the total using thenApply() and sign using another thenApply(), then we may think that the right implementation is as follows:

public static void fetchInvoiceTotalSignChainOfException()
throws InterruptedException, ExecutionException {

CompletableFuture<String> cfFetchInvoice
= CompletableFuture.supplyAsync(() -> {

logger.info(() -> "Fetch invoice by: "
+ Thread.currentThread().getName());

int surrogate = new Random().nextInt(1000);
if (surrogate < 500) {
throw new IllegalStateException(
"Invoice service is not responding");
}

return "Invoice #3344";
}).exceptionally(ex -> {
logger.severe(() -> "Exception: " + ex
+ " Thread: " + Thread.currentThread().getName());

return "[Invoice-Exception]";
}).thenApply(o -> {
logger.info(() -> "Compute total by: "
+ Thread.currentThread().getName());

int surrogate = new Random().nextInt(1000);
if (surrogate < 500) {
throw new IllegalStateException(
"Total service is not responding");
}

return o + " Total: $145";
}).exceptionally(ex -> {
logger.severe(() -> "Exception: " + ex
+ " Thread: " + Thread.currentThread().getName());

return "[Total-Exception]";
}).thenApply(o -> {
logger.info(() -> "Sign invoice by: "
+ Thread.currentThread().getName());

int surrogate = new Random().nextInt(1000);
if (surrogate < 500) {
throw new IllegalStateException(
"Signing service is not responding");
}

return o + " Signed";
}).exceptionally(ex -> {
logger.severe(() -> "Exception: " + ex
+ " Thread: " + Thread.currentThread().getName());

return "[Sign-Exception]";
});

String result = cfFetchInvoice.get();
logger.info(() -> "Result: " + result + " ");
}

Well, the issue here is that we may face an output as follows:

[INFO] Fetch invoice by: ForkJoinPool.commonPool-worker-3
[SEVERE] Exception: java.lang.IllegalStateException: Invoice service
is not responding Thread: ForkJoinPool.commonPool-worker-3
[INFO] Compute total by: ForkJoinPool.commonPool-worker-3
[INFO] Sign invoice by: ForkJoinPool.commonPool-worker-3
[SEVERE] Exception: java.lang.IllegalStateException: Signing service
is not responding Thread: ForkJoinPool.commonPool-worker-3
[INFO] Result: [Sign-Exception]

Even if the invoice couldn't be fetched, we would continue to compute the total and sign it. Obviously, this doesn't make sense. If the invoice cannot be fetched, or the total cannot be computed, then we expect to abort the process. While this implementation can be a good fit when we can recover and continue, it is definitely no good for our scenario. For our scenario, the following implementation is needed:

public static void fetchInvoiceTotalSignException()
throws InterruptedException, ExecutionException {

CompletableFuture<String> cfFetchInvoice
= CompletableFuture.supplyAsync(() -> {

logger.info(() -> "Fetch invoice by: "
+ Thread.currentThread().getName());

int surrogate = new Random().nextInt(1000);
if (surrogate < 500) {
throw new IllegalStateException(
"Invoice service is not responding");
}

return "Invoice #3344";
}).thenApply(o -> {
logger.info(() -> "Compute total by: "
+ Thread.currentThread().getName());

int surrogate = new Random().nextInt(1000);
if (surrogate < 500) {
throw new IllegalStateException(
"Total service is not responding");
}

return o + " Total: $145";
}).thenApply(o -> {
logger.info(() -> "Sign invoice by: "
+ Thread.currentThread().getName());

int surrogate = new Random().nextInt(1000);
if (surrogate < 500) {
throw new IllegalStateException(
"Signing service is not responding");
}

return o + " Signed";
}).exceptionally(ex -> {
logger.severe(() -> "Exception: " + ex
+ " Thread: " + Thread.currentThread().getName());

return "[No-Invoice-Exception]";
});

String result = cfFetchInvoice.get();
logger.info(() -> "Result: " + result + " ");
}

This time, an exception occurring in any of the implied CompletableFuture will stop the process. Here is a possible output:

[INFO ] Fetch invoice by: ForkJoinPool.commonPool-worker-3
[SEVERE] Exception: java.lang.IllegalStateException: Invoice service
is not responding Thread: ForkJoinPool.commonPool-worker-3
[INFO ] Result: [No-Invoice-Exception]

Starting with JDK 12, the exceptional cases can be further parallelized via exceptionallyAsync() that can use the same thread as the code that caused the exception or a thread from the given thread pool (Executor). Here is an example:

public static void fetchOrderTotalExceptionAsync() {

ExecutorService executor = Executors.newSingleThreadExecutor();

CompletableFuture<Integer> totalOrder
= CompletableFuture.supplyAsync(() -> {

logger.info(() -> "Compute total by: "
+ Thread.currentThread().getName());

int surrogate = new Random().nextInt(1000);
if (surrogate < 500) {
throw new IllegalStateException(
"Computing service is not responding");
}

return 1000;
}).exceptionallyAsync(ex -> {
logger.severe(() -> "Exception: " + ex
+ " Thread: " + Thread.currentThread().getName());

return 0;
}, executor);

int result = totalOrder.get();
logger.info(() -> "Total: " + result + " ");
executor.shutdownNow();
}

The output reveals that the code that caused the exception was executed by a thread named ForkJoinPool.commonPool-worker-3, while the exceptional code was executed by a thread from the given thread pool named pool-1-thread-1:

Compute total by: ForkJoinPool.commonPool-worker-3
Exception: java.lang.IllegalStateException: Computing service is
not responding Thread: pool-1-thread-1
Total: 0
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.2.240