To consume messages sent via Spring Cloud Stream, the chat application needs its own CommentService:
@Service @EnableBinding(Sink.class) public class CommentService implements WebSocketHandler { private final static Logger log = LoggerFactory.getLogger(CommentService.class); ... }
The preceding code can be described as follows:
- @Service marks this as a Spring bean, picked up automatically when the chat microservice starts
- @EnableBinding(Sink.class) shows this to be a receiver for Spring Cloud Stream messages
- Our service implements WebSocketHandler, a WebFlux interface that comes with a handle(WebSocketSession) method (which we'll use shortly)
- An Slf4j Logger is used to print out traffic passing through
This service needs to consume the messages sent from Spring Cloud Stream. However, the destination for these messages is not another Spring Cloud Stream destination. Instead, we want to pipe them into a WebSocket session.
To do that, we need to pull messages down from a RabbitMQ-based Flux, and forward them to a Flux connected to a WebSocket session. This is where we need another one of those FluxSink objects:
private ObjectMapper mapper; private Flux<Comment> flux; private FluxSink<Comment> webSocketCommentSink; CommentService(ObjectMapper mapper) { this.mapper = mapper; this.flux = Flux.<Comment>create( emitter -> this.webSocketCommentSink = emitter, FluxSink.OverflowStrategy.IGNORE) .publish() .autoConnect(); }
This last bit of code can easily be described as follows:
- We need a Jackson ObjectMapper, and will get it from Spring's container through constructor injection.
- To create a FluxSink that lets us put comments one by one onto a Flux, we use Flux.create(), and let it initialize our sink, webSocketCommentSink.
- When it comes to backpressure policy, it's wired to ignore backpressure signals for simplicity's sake. There may be other scenarios where we would select differently.
- publish() and autoConnect() kick our Flux into action so that it's ready to start transmitting once hooked into the WebSocket session.
The idea we are shooting for is to put events directly onto webSocketCommentSink, and then hitch the corresponding flux into the WebSocket API. Think of it like webSocketCommentSink as the object we can append comments to, and flux being the consumer pulling them off on the other end (after the consumer subscribes).
With our webSocketCommentSink configured, we can now hook it into our Spring Cloud Stream Sink, as follows:
@StreamListener(Sink.INPUT) public void broadcast(Comment comment) { if (webSocketCommentSink != null) { log.info("Publishing " + comment.toString() + " to websocket..."); webSocketCommentSink.next(comment); } }
The preceding code can be described as follows:
- The broadcast() method is marked as a @StreamListener for Sink.INPUT. Messages get deserialized as Comment objects thanks to the application/json setting.
- The code checks if our webSocketCommentSink is null, indicating whether or not it's been created.
- A log message is printed.
- The Comment is dropped into our webSocketSink, which means that it will become available to our corresponding flux automatically.
With this service in place, we can expect to see the following in the chat service's logs when a new comment arrives:
2017-08-05 : Publishing Comment(id=581d6774596aec682ffd07be,
imageId=581d6669596aec65dc9e6c05, comment=Nice cover!) to websocket...
The last step is to push this Flux of comments out over a WebSocket session. Remember the WebSocketHandler interface at the top of our class? Let's implement it:
@Override public Mono<Void> handle(WebSocketSession session) { return session.send(this.flux .map(comment -> { try { return mapper.writeValueAsString(comment); } catch (JsonProcessingException e) { throw new RuntimeException(e); } }) .log("encode-as-json") .map(session::textMessage) .log("wrap-as-websocket-message")) .log("publish-to-websocket"); }
This WebSocketHandler can be described as follows:
- We are handed a WebSocketSession which has a very simple API
- The Comment-based Flux is piped into the WebSocket via its send() method
- This Flux itself is transformed from a series of Comment objects into a series of JSON objects courtesy of Jackson, and then, finally, into a series of WebSocketMessage objects
It's important to point out that in Spring Framework 4, much of this was handled by the inner working of Spring's WebSocket API as well as its Messaging API. There was no need to serialize and deserialize Java POJOs into JSON representations. That was provided out of the box by Spring's converter services.
In Spring Framework 5, in the WebFlux module, the WebSocket API is very simple. Think of it as streams of messages coming and going. So, the duty of transforming a chain of Comment objects into one of JSON-encoded text messages is paramount. As we've just seen, with the functional paradigm of Reactor, this is no bother.