Collecting results of multiple parallel calls

We will use a combination of Java streams and CompletionStage to build a pipeline that processes the results as they arrive, and merges them into a single response. The first part of the pipeline will call the Temperature service for each location in parallel and specify what to do with each received result. Using the rx() method on the temperature request builder allows us to call a version of the get method, which immediately returns a CompletionStage result. We can then chain a Lambda expression that converts a received temperature into a forecast when the result of the call arrives, as in the following code snippet:

CompletionStage<Forecast> forecastStage = request.rx()
.get(Temperature.class)
.thenApply(temperature -> new Forecast(location, temperature));

Note that the value in forecastStage is a CompletionStage of Forecast, and not a Forecast itself. It is used to build the asynchronous pipeline, while the value of the forecast will be available to other chained lambda expressions as their argument when it's available.

The complete first part of the pipeline is implemented as follows:

CompletionStage<List<Forecast>> initialStage 
= CompletableFuture.completedFuture(new ArrayList<>());
CompletionStage<List<Forecast>> finalStage = locations.stream()
.map(location -> {
return temperatureTarget
.resolveTemplate("city", location.getName())
.request().rx().get(Temperature.class)
.thenApply(temperature -> new Forecast(location, temperature));
})
.reduce(initialStage, (combinedStage, forecastStage) -> {
return combinedStage.thenCombine(forecastStage, (forecasts, forecast) -> {
forecasts.add(forecast);
return forecasts;
});
},
(stage1, stage2) -> null); // combiner won't be used, return null

Since we need to execute multiple calls in parallel, we execute single asynchronous calls in a stream of locations to process all locations and their temperatures. Running the asynchronous rx().get() method on a stream of locations executes the calls to the Temperature service in parallel. Each call to the Temperature service is followed by building an asynchronous pipeline for each returned CompletionStage. An asynchronous pipeline is a chain of lambdas that is automatically executed on a different thread later, when the result of an asynchronous call is available. In the end, we reduce a stream of CompletionStage results into a single CompletionStage that is completed after all individual asynchronous executions are complete. This is to synchronize the parallel execution and merge the results into a list of forecasts.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.17.18