Introducing user chatting

What social media platform doesn't provide a means for users to communicate with each other? In this section, we'll enhance our application to allow chatting between users. This is another way to use asynchronous WebSocket messaging between clients and servers.

To start, let's add a new HTML element at the bottom of our template like this:

    <div id="chatBox"> 
        Greetings! 
        <br/> 
        <textarea id="chatDisplay" rows="10" cols="80" 
              disabled="true"></textarea> 
        <br/> 
        <input id="chatInput" type="text" style="width: 500px" 
              value="" /> 
        <br/> 
        <button id="chatButton">Send</button> 
        <br/> 
    </div> 

This preceding HTML code is placed right underneath the Upload widget for sending new pictures. It contains:

  • A simple greeting.
  • An HTML textarea for displaying messages, 80 columns wide and 10 rows tall. It is disabled to make it a read-only message output.
  • A text input for entering new messages.
  • A button to submit new messages.
It's true that any and all styling should be done through CSS, but we are trying to keep things simple, and not turn this into a UX-based book.

To post new messages from the text input box, we need to add another bit of code inside our piece of JavaScript:

    var outboundChatMessages = new 
WebSocket('ws://localhost:8200/app/chatMessage.new'); // Post new chat messages outboundChatMessages.onopen = function(event) { document.getElementById('chatButton') .addEventListener('click', function () { var chatInput = document.getElementById('chatInput'); console.log('Publishing "' + chatInput.value + '"'); outboundChatMessages.send(chatInput.value); chatInput.value = ''; chatInput.focus(); }); }

This last bit of code does the following:

  • It creates another WebSocket connection, this time to ws://localhost:8200/app/chatMessage.new (which we'll code further down).
  • Registers a handler function to be invoked when the onopen event of the WebSocket is triggered.
  • Finds the chatButton, and registers an event handler for the click events.
  • When clicked, fetches the chatInput text input.
  • Using the WebSocket variable, it sends the value of the chatInput text input. NOTE: This is pure text. No JSON encoding needed.
  • Clears out chatInput, and switches focus back to it.

This will transport raw strings to the server. How these messages are received will be defined shortly, but while we're here, why not go ahead and code up the other side, that is, when these messages are transmitted from server to client?

Are you getting nervous about seeing http://localhost:8200? It's appeared in a couple places so far (and will again as we write more code). It's a bit arbitrary, and also doesn't lend itself to scaling in production, right? We could stuff this value into the Config Server Git repo, and then write some JavaScript to scarf it out, but that sounds a little complicated. And it still wouldn't solve the scaling issue. The truth is that there is a much simpler solution in Chapter 9, Securing Your App with Spring Boot. So we'll stick with hard-coded URLs for now.

To display chat messages as they arrive, add the following:

    var inboundChatMessages = 
      new WebSocket('ws://localhost:8200/topic/chatMessage.new'); 
    // Listen for new chat messages 
    inboundChatMessages.onmessage = function (event) { 
      console.log('Received ' + event.data); 
      var chatDisplay = document.getElementById('chatDisplay'); 
      chatDisplay.value = chatDisplay.value + event.data + '
'; 
    };

The preceding code does the following:

  • Creates a third WebSocket connection to ws://localhost:8200/topic/chatMessage.new
  • On the WebSocket's onmessage handler, registers a function handler to be invoked with every new message
  • When an event arrives, grabs hold of the chatDisplay
  • Appends the message's data to the chatDisplay, and adds a newline character
Confused by the paths /app/chatMessage.new and /topic/chatMessage.new? The first is for sending messages from the client to our server-side application, while the latter is for sending messages from server to client. There is no requirement that they be prefixed by /app or /topic. It's just a convention to help denote where the messages are traveling.

We just defined a route to send user messages to the server as well as a route to receive messages from the server. The next step is to register these routes in our server-side code. We do so by updating our WebSocketConfig class's webSocketMapping like this:

    @Bean 
    HandlerMapping webSocketMapping(CommentService commentService, 
      InboundChatService inboundChatService, 
      OutboundChatService outboundChatService) { 
        Map<String, WebSocketHandler> urlMap = new HashMap<>(); 
        urlMap.put("/topic/comments.new", commentService); 
        urlMap.put("/app/chatMessage.new", inboundChatService); 
        urlMap.put("/topic/chatMessage.new", outboundChatService); 
 
        Map<String, CorsConfiguration> corsConfigurationMap = 
         new HashMap<>(); 
        CorsConfiguration corsConfiguration = new CorsConfiguration(); 
        corsConfiguration.addAllowedOrigin("http://localhost:8080"); 
        corsConfigurationMap.put( 
          "/topic/comments.new", corsConfiguration); 
        corsConfigurationMap.put( 
          "/app/chatMessage.new", corsConfiguration); 
        corsConfigurationMap.put( 
          "/topic/chatMessage.new", corsConfiguration); 
 
        SimpleUrlHandlerMapping mapping = new 
SimpleUrlHandlerMapping(); mapping.setOrder(10); mapping.setUrlMap(urlMap); mapping.setCorsConfigurations(corsConfigurationMap); return mapping; }

This last code contains many changes, so let's take them apart one by one:

  • Previously, this method only injected CommentService. Now we also inject InboundChatService as well as OutboundChatService. These are two services we must define based on the need to broker WebSocket messages between sessions. (Don't panic! We'll get to that real soon).
  • We have two new routes added to the urlMap--/app/chatMessage.new and /topic/chatMessage.new--which we just saw used in the web layer.
  • These same routes must also be added to our CORS policy.
Are you a little nervous about the CORS policy? Worried about managing hard-coded ports in your code when we just showed how that's not necessary in the previous chapter? Concerned about what this means when it comes time to secure everything? Don't worry, we'll show how this can be handled in Chapter 9, Securing Your App with Spring Boot.

With this adjustment to our chat microservice's WebSocketConfig, we must now configure how incoming WebSocket messages are handled. It's important to realize that if we receive the Flux of messages, and turn around and broadcast them on the same WebSocketSession, the only person receiving the messages will be the person that sent them-​-an echo server if you will.

This is why we need a broker if we want to broadcast such messages. Incoming messages must be received, relayed to a broker, and then picked up on the other side by all clients.

Now, where can we find a broker? We already have one! We've been using Spring Cloud Stream to transport messages over RabbitMQ on our behalf. We can do the same for these messages as well.

It's important to remember that Spring Cloud Stream operates on the channel paradigm. Everything is sent and received over channels. Up until now, we've gotten by using Source, Sink, and Processor, three interfaces that work with output and input. To handle new comment-based messages, client-to-server user messages, and server-to-client user messages, those two channels aren't enough.

So, we need to define a new set of streams. We can do that by creating our own interface, ChatServiceStreams in the chat microservice, as shown here:

    public interface ChatServiceStreams { 
 
      String NEW_COMMENTS = "newComments"; 
      String CLIENT_TO_BROKER = "clientToBroker"; 
      String BROKER_TO_CLIENT = "brokerToClient"; 
 
      @Input(NEW_COMMENTS) 
      SubscribableChannel newComments(); 
 
      @Output(CLIENT_TO_BROKER) 
      MessageChannel clientToBroker(); 
 
      @Input(BROKER_TO_CLIENT) 
      SubscribableChannel brokerToClient(); 
    } 

This preceding declarative cornerstone of our chat service can be described as follows:

  • Three channel names are defined at the top--NEW_COMMENTS, CLIENT_TO_BROKER, and BROKER_TO_CLIENT. They each map onto a channel name of newComments, clientToBroker, and brokerToClient.
  • newComments() is defined as an input linked to the NEW_COMMENTS channel via the @Input annotation, and has a return type of SubscribableChannel, meaning, it can be used to consume messages.
  • clientToBroker() is defined as an output linked to the CLIENT_TO_BROKER channel via the @Output annotation, and has a return type of MessageChannel, which means that it can be used to transmit messages.
  • brokerToClient() is defined as an input linked to the BROKER_TO_CLIENT channel via the @Input annotation, and also has a return type of SubscribableChannel, which means it, too, can be used to consume messages.

We need this interface in place so we can then dive into creating that InboundChatService we promised to build earlier:

    @Service 
    @EnableBinding(ChatServiceStreams.class) 
    public class InboundChatService implements WebSocketHandler { 
 
      private final ChatServiceStreams chatServiceStreams; 
 
      public InboundChatService(ChatServiceStreams chatServiceStreams)
{ this.chatServiceStreams = chatServiceStreams; } @Override public Mono<Void> handle(WebSocketSession session) { return session .receive() .log("inbound-incoming-chat-message") .map(WebSocketMessage::getPayloadAsText) .log("inbound-convert-to-text") .map(s -> session.getId() + ": " + s) .log("inbound-mark-with-session-id") .flatMap(this::broadcast) .log("inbound-broadcast-to-broker") .then(); } public Mono<?> broadcast(String message) { return Mono.fromRunnable(() -> { chatServiceStreams.clientToBroker().send( MessageBuilder .withPayload(message) .build()); }); } }

This preceding service code, registered to handle messages coming in on /app/chatMessage.new can be described as follows:

  • @Service marks it as a Spring service that should launch automatically thanks to Spring Boot's component scanning.
  • @EnableBinding(ChatServiceStreams.class) signals Spring Cloud Stream to connect this component to its broker-handling machinery.
  • It implements the WebSocketHandler interface--when a client connects, the handle(WebSocketSession) method will be invoked.
  • Instead of using the @StreamListener annotation as in the previous code, this class injects a ChatServiceStreams bean (same as the binding annotation) via constructor injection.
  • To handle a new WebSocketSession, we grab it and invoke its receive() method. This hands us a Flux of potentially endless WebSocketMessage objects. These would be the incoming messages sent in by the client that just connected. NOTE: Every client that connects will invoke this method independently.
  • We map the Flux<WebSocketMessage> object's stream of payload data into a Flux<String> via getPayloadAsText().
  • From there, we transform each raw message into a formatted message with the WebSocket's session ID prefixing each message.
  • Satisfied with our formatting of the message, we flatMap it onto our broadcast() message in order to broadcast it to RabbitMQ.
  • To hand control to the framework, we put a then() on the tail of this Reactor flow so Spring can subscribe to this Flux.
  • The broadcast method, invoked as every message is pulled down, marshals and transmits the message by first building a Spring Cloud Streams Message<String> object. It is pushed out over the ChatServiceStreams.clientToBroker() object's MessageChannel via the send() API. To reactorize it, we wrap it with Mono.fromRunnable.

Whew! That's a lot of code! Such is the effect of functional reactive programming (FRP). Not a lot of effort is spent on imperative constructs and intermediate results. Instead, each step is chained to the next step, forming a transforming flow, pulling data from one input (the WebSocketSession in this case), and steering it into a channel for the broker (ChatServiceStreams.clientToBroker()).

Remember earlier when we created a chat.yml file in our Config Server's Git repo? Here's the key fragment:

    spring: 
      cloud: 
        stream: 
          bindings: 
            clientToBroker: 
              destination: learning-spring-boot-chat-user-messages 
              group: app-chatMessages 

It contains an entry for spring.cloud.stream.bindings.clientToBroker, where clientToBroker matches the channel name we set in ChatServiceStreams. It indicates that messages transmitted over the clientToBroker channel will be put on RabbitMQ's learning-spring-boot-chat-user-messages exchange, and grouped with other messages marked app-chatMessages.

This sets things up to broadcast any user-based chat message to everyone. We just need to have every user listen for them!

To do so, we need to create that other service we promised to build earlier, OutboundChatService:

    @Service 
    @EnableBinding(ChatServiceStreams.class) 
    public class OutboundChatService implements WebSocketHandler { 
 
      private final static Logger log = 
        LoggerFactory.getLogger(CommentService.class); 
 
      private Flux<String> flux; 
      private FluxSink<String> chatMessageSink; 
 
      public OutboundChatService() { 
        this.flux = Flux.<String>create( 
          emitter -> this.chatMessageSink = emitter, 
           FluxSink.OverflowStrategy.IGNORE) 
           .publish() 
           .autoConnect(); 
      } 
 
      @StreamListener(ChatServiceStreams.BROKER_TO_CLIENT) 
      public void listen(String message) { 
        if (chatMessageSink != null) { 
          log.info("Publishing " + message + 
            " to websocket..."); 
            chatMessageSink.next(message); 
        } 
      } 
 
      @Override 
      public Mono<Void> handle(WebSocketSession session) { 
        return session 
         .send(this.flux 
         .map(session::textMessage) 
         .log("outbound-wrap-as-websocket-message")) 
         .log("outbound-publish-to-websocket"); 
 
      } 
    }

The code can be described as follows:

  • Again, the @Service annotation marks this as an automatically wired Spring service.
  • It has the same EnableBinding(ChatServicesStreams.class) as the inbound service, indicating that this, too, will participate with Spring Cloud Streams.
  • The constructor call wires up another one of those FluxSink objects, this time for a Flux or strings.
  • @StreamListener(ChatServiceStreams.BROKER_TO_CLIENT) indicates that this service will be listening for incoming messages on the brokerToClient channel. When it receives one, it will forward it to chatMessageSink.
  • This class also implements WebSocketHandler, and each client attaches via the handle(WebSocketSession) method. It is there that we connect the flux of incoming messages to the WebSocketSession via its send() method.
  • Because WebSocketSession.send() requires Flux<WebSocketMessage>, we map the Flux<String> into it using session::textMessage. Nothing to serialize.
  • There is a custom log flag when the Flux finished, and another for when the entire Flux is handled.

That's it!

With InboundChatService routing individual messages from client to server to broker, we are able to take individual messages and broadcast them to ALL users. Then, with OutboundChatService pulling down copies of the message for each WebSocket session, each user is able to receive a copy.

Don't forget, we also added a binding to chat.yml on the Config Server to OutboundChatService as well:

    spring: 
      cloud: 
        stream: 
          bindings: 
            brokerToClient: 
              destination: learning-spring-boot-chat-user-messages 
              group: topic-chatMessages 

And remember that little bit of JavaScript we wrote to subscribe to ws://localhost:8200/topic/chatMessage.new? It will receive the broadcast messages.

Flux and FluxSink--if you haven't caught on, linking async operations with pre-established Flux objects is easily handled by this pattern. We've seen it several times now. If both sides of an async service use a Flux, it's not necessary. But if something bars hooking them directly, this mechanism easily bridges the gap.

The names InboundChatService and OutboundChatService are somewhat arbitrary. The important point to note is that one is responsible for transporting WebSocket messages from the client to the broker through the server. Those are incoming. After crossing the broker, we describe them at this stage as being outgoing. The naming convention is meant to help remember what does what. Neither Spring Boot nor Spring Cloud Stream care about what these classes are named.

With this enhancement, we can fire things up and see what it looks like.

In the following screenshot of our new chat box there is a conversation involving two users:

The prefix values (2f05fa8e and 298b3bcf) are pure WebSocket session IDs. Kind of tricky to connect with a human user, ehh? Nevertheless, this interchange is what is seen by all parties. (Since both sides see the same exchange, no need to show both browser tabs.)

However, if we peek inside the browser's JavaScript console, we get a new insight. The following is a screenshot from the user with 2f05fa8e as their session ID:

We can immediately see the first message (Do you like the new cover?) being published, and received right back. Following that, the other user sends a separate message (You bet! Wish I could get a t-shirt).

If we inspect the other user's JavaScript console, we can see the other side of the conversation:

The first message was from the first user (Do you like the new cover?) followed by the second user's response (You bet!...), and so forth.

Simple. Elegant. Asynchronous. That's what WebSockets are for. And here we have a simple usage.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.194.106