Server-sent events

Server-sent events (SSE) is an example of an asynchronous, HTTP-based, publish-subscribe technology. It offers an easy-to-use one-way streaming communication protocol. Clients can register for a topic by requesting a HTTP resource that leaves an open connection. The server sends messages to connected clients over these active HTTP connections. Clients cannot directly communicate back, but can only open and close connections to the streaming endpoint. This lightweight solution fits use cases with broadcast updates, such as social media updates, stock prices, or news feeds.

The server pushes UTF-8 text-based data as content type text/event-stream to clients who previously registered for the topics. The following shows the format of the events:

data: This is a message

event: namedmessage
data: This message has an event name

id: 10
data: This message has an id which will be sent as
 'last event ID' if the client reconnects

The fact that server-sent events are based on HTTP makes them easy to integrate in existing networks or developer tools. SSE natively support event IDs and reconnects. Clients that reconnect to a streaming endpoint provide the last received event ID to continue subscribing where they left off.

JAX-RS supports server-sent events on both the server-side and client-side. SSE streaming endpoints are defined using JAX-RS resources as follows:

import javax.ws.rs.DefaultValue;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.sse.*;

@Path("events-examples")
@Singleton
public class EventsResource {

    @Context
    Sse sse;

    private SseBroadcaster sseBroadcaster;
    private int lastEventId;
    private List<String> messages = new ArrayList<>();

    @PostConstruct
    public void initSse() {
        sseBroadcaster = sse.newBroadcaster();

        sseBroadcaster.onError((o, e) -> {
            ...
        });
    }

    @GET
    @Lock(READ)
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void itemEvents(@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER)
                           @DefaultValue("-1") int lastEventId,
                           @Context SseEventSink eventSink) {

        if (lastEventId >= 0)
            replayLastMessages(lastEventId, eventSink);

        sseBroadcaster.register(eventSink);
    }

    private void replayLastMessages(int lastEventId, SseEventSink eventSink) {
        try {
            for (int i = lastEventId; i < messages.size(); i++) {
                eventSink.send(createEvent(messages.get(i), i + 1));
            }
        } catch (Exception e) {
            throw new InternalServerErrorException("Could not replay messages ", e);
        }
    }

    private OutboundSseEvent createEvent(String message, int id) {
        return sse.newEventBuilder().id(String.valueOf(id)).data(message).build();
    }

    @Lock(WRITE)
    public void onEvent(@Observes DomainEvent domainEvent) {
        String message = domainEvent.getContents();
        messages.add(message);

        OutboundSseEvent event = createEvent(message, ++lastEventId);

        sseBroadcaster.broadcast(event);
    }
}

The text/event-stream content type is used for Server-sent events. The registered SseEventSink instructs JAX-RS to keep the client connection open for future events sent through the broadcaster. The SSE standard defines that the Last-Event-ID header controls where the event stream will continue. In this example, the server will resend the messages that have been published while clients were disconnected.

The itemEvents() method implements the streaming registration and immediately resends missing events to that client if required. After the output is registered the client, together will all other active clients, will receive future messages that are created using Sse.

The asynchronous integration into our enterprise application happens via the observed DomainEvent. Every time a CDI event of this type is fired somewhere in the application, active SSE clients will receive a message.

JAX-RS also supports the ability to consume SSE. SseEventSource offers a functionality to open a connection to an SSE endpoint. It registers an event listener that is called as soon as a message arrives:

import java.util.function.Consumer;

public class SseClient {

    private final WebTarget target = ClientBuilder.newClient().target("...");
    private SseEventSource eventSource;

    public void connect(Consumer<String> dataConsumer) {
        eventSource = SseEventSource.target(target).build();

        eventSource.register(
                item -> dataConsumer.accept(item.readData()),
                Throwable::printStackTrace,
                () -> System.out.println("completed"));

        eventSource.open();
    }

    public void disconnect() {
        if (eventSource != null)
            eventSource.close();
    }
}

After the SseEventSource successfully opens the connection, the current thread continues. The listener, in this case, dataConsumer#accept, will be called as soon as events arrive. SseEventSource will handle all required handling defined by the SSE standard. This includes, for example, reconnecting after connection loss and sending a Last-Event-ID header.

Clients also have the possibility for more sophisticated solutions with manually controlling headers and reconnects. Therefore the SseEventInput type is requested with the text/event-stream content type from a conventional web target. For more information, please refer to the JAX-RS specification.

Server-sent events offer an easy-to-use one-way streaming solution over HTTP that integrates itself well into the Java EE technology.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.107.31