Although we've improved the performance of our service, you must have noticed that the code became a lot more complex. We can simplify it by using a framework that provides an alternative API to CompletionStage for chaining asynchronous callbacks and joining results of parallel executions.
In Chapter 3, Connecting Microservices Together, you learned how to use RxJava with a JAX-RS client. We're going to use RxJava now, to replace CompletionStage with Flowable , and simplify our code.
The first part, which calls remote REST services in parallel, will be very similar to what we already have. The difference is mostly in using RxFlowableInvoker to retrieve a stream of Flowable instead of CompletionStage. We don't need to reduce the stream into a single Flowable; because RxJava provides a much more convenient way to join Flowable:
Stream<Flowable<Forecast>> futureForecasts = locations.stream()
.map(location -> {
return temperatureTarget
.register(RxFlowableInvokerProvider.class)
.resolveTemplate("city", location.getName())
.request()
.rx(RxFlowableInvoker.class)
.get(Temperature.class)
.map(temperature -> new Forecast(location, temperature));
});
In the second part, we use the static concat method on the Flowable class to join all flowables into a single one:
Iterable<Flowable<Forecast>> iFutureForecasts = futureForecasts::iterator;
Flowable.concat(iFutureForecasts)
.doOnNext(forecast -> {
response.getForecasts().add(forecast);
})
.doOnComplete(() -> {
asyncResponse.resume(Response.ok(response).build());
})
.doOnError(asyncResponse::resume)
.subscribe();
}
Unlike the final CompletionStage we had before, which worked with the final list of results, the Flowable we get now is a stream that produces multiple values. We handle each of them with the doOnNext method. A similar method wasn't available with CompletionStage, and that's why we had to concatenate all results in a list in a single CompletionStage instance.
Another new thing is that we call subscribe after we build the execution pipeline. This is because a call with RxFlowableInvoker is executed lazily only after we declare interest in the values of the returned Flowable, for example, by calling the subscribe method.
Finally, similarly to using CompletionStage, we complete the response within the doOnComplete method and handle exceptions within the doOnError method.