Building an asynchronous service

SSE is a web standard (https://www.w3.org/TR/eventsource/) that specifies how web servers can send a stream of messages during a potentially unlimited period of time. The connection has to be initiated by a client, and then the client only listens for messages received from the server. It works very similarly to a standard HTTP request, with the exception that the response can be sent in multiple chunks.

In Chapter 3Connecting Microservices Together, we built a Forecast service, which retrieves data from location and temperature services and sends a list of forecasts in the response. Now, we're going to implement the same Forecast service as an asynchronous service. We will be providing forecasts as messages using SSE instead of sending all as a single response. Individual forecasts will be sent immediately when retrieved from the Temperature service, without waiting for other forecasts to be ready.

The same JAX-RS mechanism we used in Chapter 3, Connecting Microservices Together, to build simple REST services also supports building SSE server endpoints. Therefore, we can reuse a big portion of the code we built in the Chapter 3Connecting Microservices Together.

The support for SSE is completely new in Java EE 8. The JAX-RS component added support for building SSE endpoints in the latest version, 2.1, which is part of Java EE 8. 

We're going to start with the services we created in the Chapter 3, Connecting Microservices Together. We'll be using the same LocationResource, TemperatureResource, and ForecastResource classes. We'll need the same Forecast, Temperature, and Location data objects, which will be mapped to the JSON payload of the SSE messages. And we'll need the same configuration we've already used in the SmartCityServices class.

In order to make the Forecast service asynchronous, we're going to modify the ForecastResource class. Let’s start with a skeleton of this class, which is already modified to send SSE events in the response.

This is the initial skeleton of the ForecastResource class:

@Path("/forecast")
public class ForecastResource {
@Uri("location")
private WebTarget locationTarget;

@Uri("temperature/{city}")
private WebTarget temperatureTarget;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getLocationsWithTemperature(
@Context SseEventSink eventSink,
@Context Sse sse) {
// ... method body
}
}

The method body will do a very similar job as the simple synchronous ForecastResource class in Chapter 3Connecting Microservices Together. The difference is that it will send the response using the injected SseEventSink instead of returning the response as a result of the method. We'll use the send method of the SseEventSink object to send the data. And we need to create a data object that holds the message to be sent. For that, we'll use a builder acquired from the second injected object of the Sse type.

Here's an example of sending a message in JSON format with a payload in a variable named response and with an ID  set as "event1":

OutboundSseEvent.Builder sseDataEventBuilder = sse.newEventBuilder()
.mediaType(MediaType.APPLICATION_JSON_TYPE);
eventSink.send(
sseDataEventBuilder
.data(response)
.id(“event1”)
.build()
);
Keep in mind that every method call on the OutboundSseEvent.Builder builder object modifies its internal structure. Make sure that you overwrite all previously set values if you don't want to set them for the new event, or use a new instance of the event builder.

All the steps in the new method body can be summarized into the following points:

  • The list of locations is retrieved from the Location service as in the previous Chapter 3Connecting Microservices Together.
  • For every location, the forecast temperature will be retrieved using the Temperature service, again as in previous chapter.
  • Every retrieved temperature will be sent as an event, using the send method of SseEventSink.
  • After all the locations are processed, a final event with an ID of completed will be sent to indicate the end of the event stream.
  • The SSE connection is closed by calling the close method of SseEventSink.
For a finite stream of events, we recommend sending a termination event after all data events are sent. This is to detect the end of the event stream on the client side safely and without any doubt. Some SSE clients don't detect the end of the stream correctly and try reconnecting infinitely after the connection is closed, assuming the connection was prematurely interrupted and new data is still to come.

And here is the complete source code of the getLocationsWithTemperature method, which executes the preceding points:

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getLocationsWithTemperature(
@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER) Integer lastEventId,
@Context SseEventSink eventSink,
@Context Sse sse) {
OutboundSseEvent.Builder sseDataEventBuilder = sse.newEventBuilder()
.mediaType(MediaType.APPLICATION_JSON_TYPE);
List<Location> locations = locationTarget.request()
.get(new GenericType<List<Location>>() {});
int eventId = 0;
for (Location location : locations) {
eventId++;
if (lastEventId != null && eventId <= lastEventId) {
continue; // skip to the last ID before reconnection
}
Temperature temperature = temperatureTarget
.resolveTemplate("city", location.getName())
.request()
.get(Temperature.class);
ForecastResponse response = new ForecastResponse()
.forecast(new Forecast(location, temperature));
eventSink.send(
sseDataEventBuilder
.data(response)
.id(String.valueOf(eventId))
.build());
};
eventSink.send(
sse.newEventBuilder()
.id("completed") // final event
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data("{}") // empty JSON data required by some browsers
.build());
eventSink.close();
}

This code even does a little bit more in order to support reconnection to an SSE event stream in case of a network failure. This allows clients to resume receiving events from the point when the connection was interrupted, and is supported by the SSE protocol out of the box. We'll talk more about automatic reconnection with SSE later.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.173.199