Broadcasting saved comments

To consume messages sent via Spring Cloud Stream, the chat application needs its own CommentService:

    @Service 
    @EnableBinding(Sink.class) 
    public class CommentService implements WebSocketHandler { 
 
      private final static Logger log = 
        LoggerFactory.getLogger(CommentService.class); 
        ... 
    } 

The preceding code can be described as follows:

  • @Service marks this as a Spring bean, picked up automatically when the chat microservice starts
  • @EnableBinding(Sink.class) shows this to be a receiver for Spring Cloud Stream messages
  • Our service implements WebSocketHandler, a WebFlux interface that comes with a handle(WebSocketSession) method (which we'll use shortly)
  • An Slf4j Logger is used to print out traffic passing through

This service needs to consume the messages sent from Spring Cloud Stream. However, the destination for these messages is not another Spring Cloud Stream destination. Instead, we want to pipe them into a WebSocket session.

To do that, we need to pull messages down from a RabbitMQ-based Flux, and forward them to a Flux connected to a WebSocket session. This is where we need another one of those FluxSink objects:

    private ObjectMapper mapper; 
    private Flux<Comment> flux; 
    private FluxSink<Comment> webSocketCommentSink; 
 
    CommentService(ObjectMapper mapper) { 
      this.mapper = mapper; 
      this.flux = Flux.<Comment>create( 
        emitter -> this.webSocketCommentSink = emitter, 
        FluxSink.OverflowStrategy.IGNORE) 
         .publish() 
         .autoConnect(); 
    } 

This last bit of code can easily be described as follows:

  • We need a Jackson ObjectMapper, and will get it from Spring's container through constructor injection.
  • To create a FluxSink that lets us put comments one by one onto a Flux, we use Flux.create(), and let it initialize our sink, webSocketCommentSink.
  • When it comes to backpressure policy, it's wired to ignore backpressure signals for simplicity's sake. There may be other scenarios where we would select differently.
  • publish() and autoConnect() kick our Flux into action so that it's ready to start transmitting once hooked into the WebSocket session.

The idea we are shooting for is to put events directly onto webSocketCommentSink, and then hitch the corresponding flux into the WebSocket API. Think of it like webSocketCommentSink as the object we can append comments to, and flux being the consumer pulling them off on the other end (after the consumer subscribes).

With our webSocketCommentSink configured, we can now hook it into our Spring Cloud Stream Sink, as follows:

    @StreamListener(Sink.INPUT) 
    public void broadcast(Comment comment) { 
      if (webSocketCommentSink != null) { 
        log.info("Publishing " + comment.toString() + 
         " to websocket..."); 
        webSocketCommentSink.next(comment); 
      } 
    } 

The preceding code can be described as follows:

  • The broadcast() method is marked as a @StreamListener for Sink.INPUT. Messages get deserialized as Comment objects thanks to the application/json setting.
  • The code checks if our webSocketCommentSink is null, indicating whether or not it's been created.
  • A log message is printed.
  • The Comment is dropped into our webSocketSink, which means that it will become available to our corresponding flux automatically.

With this service in place, we can expect to see the following in the chat service's logs when a new comment arrives:

2017-08-05 : Publishing Comment(id=581d6774596aec682ffd07be, 
imageId=581d6669596aec65dc9e6c05, comment=Nice cover!) to websocket...

The last step is to push this Flux of comments out over a WebSocket session. Remember the WebSocketHandler interface at the top of our class? Let's implement it:

    @Override 
    public Mono<Void> handle(WebSocketSession session) { 
      return session.send(this.flux 
        .map(comment -> { 
          try { 
            return mapper.writeValueAsString(comment); 
          } catch (JsonProcessingException e) { 
              throw new RuntimeException(e); 
          } 
        }) 
        .log("encode-as-json") 
        .map(session::textMessage) 
        .log("wrap-as-websocket-message")) 
      .log("publish-to-websocket"); 
    } 

This WebSocketHandler can be described as follows:

  • We are handed a WebSocketSession which has a very simple API
  • The Comment-based Flux is piped into the WebSocket via its send() method
  • This Flux itself is transformed from a series of Comment objects into a series of JSON objects courtesy of Jackson, and then, finally, into a series of WebSocketMessage objects

It's important to point out that in Spring Framework 4, much of this was handled by the inner working of Spring's WebSocket API as well as its Messaging API. There was no need to serialize and deserialize Java POJOs into JSON representations. That was provided out of the box by Spring's converter services.

In Spring Framework 5, in the WebFlux module, the WebSocket API is very simple. Think of it as streams of messages coming and going. So, the duty of transforming a chain of Comment objects into one of JSON-encoded text messages is paramount. As we've just seen, with the functional paradigm of Reactor, this is no bother.

Getting bogged down in POJO overload? Seeing Comment domain objects in every microservice? Don't panic! While we could write some common module that was used by every microservice to hold this domain object, that may not be the best idea. By letting each microservice manage their own domain objects, we reduce coupling. For example, only the comments service actually marks the id field with Spring Data Commons's @Id annotation, since it's the only one talking to MongoDB. What may appear identical in code actually carries slightly semantic differences that can arise down the road.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.163.197