Simplifying the code with a third-party reactive framework

Although we've improved the performance of our service, you must have noticed that the code became a lot more complex. We can simplify it by using a framework that provides an alternative API to CompletionStage for chaining asynchronous callbacks and joining results of parallel executions.

In Chapter 3, Connecting Microservices Together, you learned how to use RxJava with a JAX-RS client. We're going to use RxJava now, to replace CompletionStage with Flowable , and simplify our code.

The first part, which calls remote REST services in parallel, will be very similar to what we already have. The difference is mostly in using RxFlowableInvoker to retrieve a stream of Flowable instead of CompletionStage. We don't need to reduce the stream into a single Flowable; because RxJava provides a much more convenient way to join Flowable:

Stream<Flowable<Forecast>> futureForecasts = locations.stream()
.map(location -> {
return temperatureTarget
.register(RxFlowableInvokerProvider.class)
.resolveTemplate("city", location.getName())
.request()
.rx(RxFlowableInvoker.class)
.get(Temperature.class)
.map(temperature -> new Forecast(location, temperature));
});

In the second part, we use the static concat method on the Flowable class to join all flowables into a single one:

Iterable<Flowable<Forecast>> iFutureForecasts = futureForecasts::iterator;
Flowable.concat(iFutureForecasts)
.doOnNext(forecast -> {
response.getForecasts().add(forecast);
})
.doOnComplete(() -> {
asyncResponse.resume(Response.ok(response).build());
})
.doOnError(asyncResponse::resume)
.subscribe();
}

Unlike the final CompletionStage we had before, which worked with the final list of results, the Flowable we get now is a stream that produces multiple values. We handle each of them with the doOnNext method. A similar method wasn't available with CompletionStage, and that's why we had to concatenate all results in a list in a single CompletionStage instance.

Using RxJava and Flowable enables us to process results of multiple parallel calls as a stream of results, immediately after individual results are available. This makes the code more efficient and is simpler than reducing all results into a CompletionStage.

Another new thing is that we call subscribe after we build the execution pipeline. This is because a call with RxFlowableInvoker is executed lazily only after we declare interest in the values of the returned Flowable, for example, by calling the subscribe method.

Finally, similarly to using CompletionStage, we complete the response within the doOnComplete method and handle exceptions within the doOnError method.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.22.181.47