Building an SSE Java client

To consume our asynchronous SSE forecast service from Java code, we're going to create a standalone Java application that will connect to it and print the data received as events. We're again going to use the JAX-RS client API we used in the previous chapter, Chapter 3, Connecting Microservices Together, to access a standard REST service.

Remember that SSE is just an extension of the standard HTTP protocol, and the JAX-RS client API for SSE also works as an extension to the standard client API. We're going to use the SseEventSource class from the JAX-RS API to extend the standard WebTarget object and to add listeners to handle SSE events.

To build our SSE client, we need to create a Java project and add some required dependencies that provide the Java EE functionality to a plain Java application. We'll need the following dependencies:

  • jersey-client
  • jersey-hk2
  • jersey-media-json-binding
  • jersey-media-sse

The first three are the same dependencies that were needed in the Building a Client section of the Chapter 3Connecting Microservices Together. The only new dependency is jersey-media-sse, which provides the SSE portion of the standard JAX-RS client API.

If you want to invoke an SSE service from a web application deployed to a Java EE container, all of the required dependencies are provided by the container, and none of the Jersey dependencies are needed in your application.

The Maven dependency for jersey-media-sse can be added as follows:

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-sse</artifactId>
<version>2.26</version>
</dependency>

In the Java client, we'll create an instance of SseEventSource. We'll do this by first creating an SSE builder from an instance of webTarget using the static target method. This builder will then create an instance of SseEventSource while it allows us to configure the event source before it's created.

This is the simplest example of creating SseEventSource from an instance of webTarget, which is stored in the webTarget variable:

SseEventSource sseEventSource = SseEventSource.target(webTarget).build();

Unlike the JavaScript EventSource object, the creation of an SseEventSource instance won't open the connection automatically. We should first call the register method to register an event callback and, optionally, error and completion callbacks. Only then will we open the connection with the open method. 

Use the register and open methods to initiate a connection, and register callbacks, as follows:

sseEventSource.register(onEvent, onError, onComplete);
sseEventSource.open();

The callbacks passed to the register method can be simple lambda expressions that implement particular functional interfaces, as follows:

  • Consumer<InboundSseEvent> onEvent
  • Consumer<Throwable> onError
  • Runnable onComplete 

Finally, we need to make sure that both the SseEventSource instance and the original JAX-RS Client instance are closed once the client stops consuming SSE events in order to close the connection and release unused resources. In order to synchronize the main thread of our Java client with the callbacks that are going to be executed in different threads, we'll use CompletableFuture, which provides a simple thread-safe API to complete execution and wait for its completion in a different thread. If CompletableFuture.join() is called, the main thread will wait until SSE processing is finished. 

The client can receive the following events:

  • The onComplete callback is triggered, meaning the client detected the end of the response.
  • The onError callback is triggered, meaning the client detected an unrecoverable error.
  • The onEvent callback is triggered by a termination event, meaning that the server sent the last event.
The last condition doesn't apply to every SSE service. The Forecast service is designed to send a termination event with a special event ID so that the client can easily detect the end of the event stream. This is because we've seen that SSE clients detect the end of SSE event stream ambiguously, sometimes hanging and waiting for more events to come.

Here is the complete code for ForecastResourceSseClient.java, which is a standalone Java program that connects to the Forecast service and prints the received data, as follows:

public class ForecastResourceSseClient {
public static void main(String... args) throws Exception {
Client restClient = ClientBuilder.newClient();
WebTarget target = restClient.target("http://localhost:8080/"
+ "forecast-service-async/smartcity/forecast");
SseEventSource sseEventSource = SseEventSource.target(target)
.build();
CompletableFuture<String> asyncProcessing = new CompletableFuture<>();

Consumer<InboundSseEvent> onEvent = (InboundSseEvent event) -> {
if ("completed".equals(event.getId())) {
asyncProcessing.complete("complete event received");
} else {
ForecastResponse forecastResponse = event.readData(ForecastResponse.class,
MediaType.APPLICATION_JSON_TYPE);
System.out.println("Event received: " + forecastResponse);
}
};
Consumer<Throwable> onError = (Throwable error) -> {
asyncProcessing.completeExceptionally(error);
};
Runnable onComplete = () -> {
asyncProcessing.complete("OnComplete");
};

sseEventSource.register(onEvent, onError, onComplete);
sseEventSource.open();
asyncProcessing.join();
sseEventSource.close();
restClient.close();
}
}

The SSE client can be summarized into the following steps:

  1. A standard JAX-RS WebTarget is created for the URL of an SSE service.
  2. An SseEventSource is created from WebTarget using SseEventSource.target.
  3. An instance of CompletableFuture is created for later synchronization.
  4. Callbacks are attached using the register method; callbacks complete the future appropriately.
  5. The SSE service is invoked by calling the open method.
  6. The main thread waits until the SSE event stream is processed by waiting for the future to complete.
  7. The SseEventSource and Client resources are closed.
When using CompletableFuture, as in our example, we can chain closing of the SseEventSource and Client resources right after the future call is completed, with a method such as thenRun of CompletableFuture. This will ensure that the resources are closed without the main thread waiting for the completion of the future.

Alternatively, we can chain the closing of resources after the future is completed with the thenRun method, like this:

asyncProcessing.thenRun(() -> {
sseEventSource.close();
restClient.close();
})
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.124.21