Writing a chat app with the WebSockets server

WebSockets is a protocol for bidirectional communication between a web browser and a server. The fact that a server can actively send you messages is the most important thing and is probably the only reason why you might want to use WebSockets in the future. There used to be some fallbacks for browsers that don't support WebSockets, such as long pooling, but we're not going to use any of them and assume that our browsers are up to date with proper WebSockets support.

Our chat WebSockets server spawns an HTTP server that listens to WebSockets headers and implements a handshake between the two endpoints to switch to the WebSockets protocol (the so-called upgrade request).

Later, we'll log all chat messages in a MySQL database.

So first, we'll create an HTTP server that implements "upgrade requests" and a class that holds all our active WebSockets connections. All we need to write HTTP servers is part of the dart:async package:

// bin/server.dart
import 'package:args/args.dart';
import 'dart:io';
import 'dart:async';
import 'dart:convert';
import 'dart:collection';

// Class representing a client connected to the chat server.
class WebSocketsClient {
  String name;
  WebSocket ws;
  
  WebSocketsClient(this.ws);
  Future close() => ws.close();
}

class ChatWebSocketsServer {
  // Hold all connected clients in this list.
  List<WebSocketsClient> _clients = [];
  
  // Add a new client based on his WebSocket connection.
  void handleWebSocket(WebSocket ws) {
    WebSocketsClient client = new WebSocketsClient(ws);
    _clients.add(client);
    print('Client connected'),
    
    // In a real app we would probably wrap JSON.decode() with
    // try & catch to filter out malformed inputs.
    // Stream.map() returns a new Stream and processes each item
    // with callback function. In our case it decodes all JSONs.
    // onDone is called when the connection is closed by client.
    ws.map((string) => JSON.decode(string))
      .listen((Map json) {
        handleMessage(client, json);
    }, onDone: () => close(client));
  }

  // Handle incoming messages.
  void handleMessage(WebSocketsClient client, Map json) {
    /* ... */
  }
  // Client closed their connection.
  void close(WebSocketsClient client) { /* ... */ }
}

main(List<String> args) {
  // Args parser configuration as above.
  HttpServer httpServer;

  if (argResults['help']) { /* ... */
  } else if (argResults['cmd'] == 'stop') { /* ... */
  } else if (argResults['cmd'] == 'start') {
    ChatWebSocketsServer wsServer = new ChatWebSocketsServer();

    print('My PID: $pid'),
    int port = int.parse(argResults['port']);
    print('Starting WebSocket server'),

    var address = InternetAddress.LOOPBACK_IP_V4;
    HttpServer httpServer = await HttpServer.bind(address, port);        
    StreamController sc = new StreamController();
    sc.stream.transform(new WebSocketTransformer())
        .listen((WebSocket ws) {
          wsServer.handleWebSocket(ws);
    });
    
    // Listen to HTTP requests.
    httpServer.listen((HttpRequest request) {
      // You can also handle different URL with request.uri.
      // if (request.uri == '/ws') { }
      sc.add(request);
    });
  }
}

This is our app in a nutshell. We use StreamController and WebSocketTransformer classes to upgrade the client's HTTP request to the WebSockets connection.

What's interesting here is the handleWebSocket() method. Notice in particular how we're using closures with the client variable. Both handleMessage() and close() are in a closure and have their own reference to client, so even with multiple calls to handleWebSocket(), we can still easily identify which client invoked each event.

The handleMessage()method is the core of the entire server app and it's called for every incoming message from all clients. Our app will handle three types of messages:

  • The client changing their chat name
  • Posting a message that is broadcasted to all connected clients
  • Requesting the entire available chat history

We can finish the server part with all we need for now:

// bin/server.dart
// A single item in the chat history.
class ChatMessage {
  String name;
  String text;
  
  ChatMessage(this.name, this.text);
  String get json => JSON.encode({'name': name, 'text': text});
}

class ChatWebSocketsServer {
  List<WebSocketsClient> _clients = [];
  // Chat history.
  ListQueue<ChatMessage> _chat;
  static const int maxHistory  = 100;
  
  ChatWebSocketsServer() {
    _chat = new ListQueue<ChatMessage>();
  }
  
  void handleWebSocket(WebSocket ws) { /* same as above */ }
  
  // Handle incoming messages.
  void handleMessage(WebSocketsClient client, Map json) {
    if (json['type'] == 'change_name') {
      client.name = json['name'];
    } else if (json['type'] == 'post' && client.name.isNotEmpty) {
      ChatMessage record =
          new ChatMessage(client.name, json['text']);
      // Keep only last maxHistory messages.
      _chat.addLast(record);
      if (_chat.length > maxHistory) _chat.removeFirst();
      broadcast(record);
      
    } else if (json['type'] == 'init') {
      // Send chat history to the client.
      // Let's not bother with performance for simplicity reasons.
      _chat.forEach((ChatMessage m) => client.ws.add(m.json));
    }
  }
  
  // Client closed their connection.
  void close(WebSocketsClient client) {
    print('Client disconnected'),
    // Remove the reference from the list of clients.
    client.close().then((_) =>
        _clients.removeAt(_indexByWebSocket(client.ws)));
  }
  
  // Close connection to all clients.
  // This is used only on server shutdown.
  Future closeAll() {
    // Make a list of all Future objects returned by close().
    List<Future> futures = [];
    _clients.forEach((client) => futures.add(client.ws.close()));
    // ... and wait until all of them complete.
    
    return Future.wait(futures);
  }
  
  // Send a message to all connected clients.
  void broadcast(ChatMessage message) {
    // Method add() sends a string to the client.
    _clients.forEach((client) => client.ws.add(message.json));
  }
  
  // Get index for this connection from the list of clients.
  int _indexByWebSocket(WebSocket ws) {
    for (int i = 0; i < _clients.length; i++) {
      if (_clients[i].ws == ws) return i;
    }
    return null;
  }
}

We're using ListQueue<T> here to store chat messages because it has the removeFirst() and addFirst() methods with constant time complexity.

Although the code is quite long, it's not hard to understand. Note the broadcast()method that sends a message to all connected clients because we have the WebSocket connections for all of them. When a client disconnects, we call its close() method and remove it from the list of clients.

Client-side WebSockets

The HTML and Dart code for a web browser using WebSocket is simple as well:

<!-- web/index.html -->
<body>
  <p>Name: <input type="text" id="name"></p>   
  <div id="chat"></div>
  <p>Message: <input type="text" id="msg">
    <button id="btn-send">Send</button></p>
  <script type="application/dart" src="main.dart"></script>
</body>

There are just two input fields and a button:

// web/main.dart
import 'dart:html';
import 'dart:convert';

main() {
  WebSocket ws = new WebSocket('ws://127.0.0.1:8888'),
  
  ws.onOpen.listen((e) {
    print('Connected'),
    ws.sendString(JSON.encode({'type': 'init'}));
  });
  
  // Automatically decode all incoming messages.
  ws.onMessage.map((MessageEvent e) => 
    JSON.decode(e.data)).listen((Map j) {
      var a = '<p><b>${j['name']}</b>: 
${j['text']}</p>';
      querySelector('#chat').appendHtml(s);
  });
  
  querySelector('#name').onKeyUp.listen((e) {
    ws.sendString(JSON.encode({
      'type': 'change_name',
      'name': e.target.value
    }));
  });
  
  querySelector('#btn-send').onClick.listen((e) {
    InputElement input = querySelector('#msg'),
    ws.sendString(JSON.encode(
        { 'type': 'post', 'text': input.value }));
    input.value = '';
  });
}

Both server and browser are using the WebSocket class, but they aren't the same. The browser's WebSocket comes from the dart:html package, while the WebSocket used in the server is from dart:io. This is why we're sending data with both sendString() and add() methods.

The WebSocket class in the browser has a couple of streams. The most interesting are onOpen and onMessage. We used onOpen to request the entire chat history from the server. Then, onMessage decoded all the data sent from the server and because we're expecting just chat messages, we appended a simple HTML to the main container.

To test the server, we can run it without any parameters:

$ dart bin/server.dart
My PID: 15725
Starting WebSocket server

Then, open multiple tabs, with index.html, and try to set different names and write a few messages. The messages should immediately appear in all tabs. In Developer Tools, we can also see both sent and received messages from the server via WebSockets:

Client-side WebSockets

Listening to Unix signals and basic I/O operations

If we wanted to run this server in a real-life application, we would ideally run it in the background, disassociate from the terminal with no output, and so on. We can do this with the nohup utility that prevents a process from terminating, even when the parent process terminates by ignoring the SIGHUP signal. Note that this isn't the same as running a daemon process. Daemons have to follow a predefined behavior, such as closing all file descriptors, changing the current directory, redirecting output, and more.

Before we do this, we need to be able to stop the server when it's running in the background. We could use ps | grep server.dart, but there could be more processes with this name running at the same time and we may not know which one is the one we want to terminate.

Running the server as a background process

We should create a well-behaved process that listens to termination signals, such as SIGTERM and SIGINT, to properly close all WebSocket connections and the HTTP server.

For this reason, we'll save our server's process ID (PID) into a file on launch and listen to the two signals. Then, running dart bin/server.dart -c stop will read the PID and use Process.kill() to send a signal to the currently running server process:

main(List<String> args) async {
  /* the rest is unchanged */
  } else if (argResults['cmd'] == 'stop') {
    // stop process by sending a SIGTERM signal
    String pidFile = argResults['pid-file'];
    String pid = await (new File(pidFile).readAsString());
    Process.killPid(int.parse(pid), ProcessSignal.SIGTERM);
  } else if (argResults['cmd'] == 'start') {
    /* Create HTTP server and so on... */

    // Save process PID to a file.
    String path = argResults['pid-file'];
    File file = await (new File(path)).create(recursive: true);
    await file.writeAsString(pid.toString());
    
    void shutdown(ProcessSignal signal) {
      print('Received signal $signal'),
      // Remove the file with PID.
      pool.close();
      
      // At this point it's better not to use await because 
      // we can call all tasks asynchronously and just wait until
      // all of them are finished.
      await Future.wait([
        new File(argResults['pid-file']).delete(),
        httpServer.close(),
        wsServer.closeAll(),
      ]);
      // End this process now with code 0.
      exit(0);
    }
    
    ProcessSignal.SIGTERM.watch().listen(shutdown);
    ProcessSignal.SIGINT.watch().listen(shutdown);
  }
}

We used the File class to create both a new file and a directory with create(recursive: true). Both File and Directory classes are based heavily on Future objects. However, some methods have synchronous and asynchronous implementations (for example, there are delete() and deleteSync() methods).

To stop the server, we run server.dart again, which sends a signal to the PID of the process that's currently running the server.

Now, we can run server.dart again:

$ nohup dart bin/server.dart > /dev/null 2>&1 &

Using > /dev/null redirects all stdout to /dev/null (discarding all output), 2>&1 redirects stderr to stdout, and finally, & runs the process in the background.

We don't even care what a server's PID is because it's saved in a file and is automatically read and killed with:

$ dart bin/server.dart -c stop

The server process gently closes all connections and terminates.

Enumerated types

Dart 1.9 brought another new keyword called enum to define a fixed set of values. In our example from the preceding section, we could define fixed number of commands as:

enum Command { start, stop }

Later in the code, we could check which command we want to use without comparing strings, as we did in the preceding example, but rather compare Command enums like this:

cmd = Command.start;
if (cmd == Command.stop) { /* ... */ }

We can update our main() method and convert a string argResults['cmd'] command into a Command enum value:

Command cmd = Command.values.firstWhere((Command c) {
  String cmdString = c.toString().split('.')[1].toLowerCase();
  return cmdString == argResults['cmd'].toLowerCase();
});

// Then follow with ifs to handle each command.
if (cmd == Command.stop) {
  // ...
} else if (cmd == Command.start) {
  // ...
}

The Command.values property is a constant that contains a List object with all possible values for this enum. Calling c.toString() returns a string representation of an enum's value which in our case are Command.start and Command.stop. The List.firstWhere() method iterates all items in the list until the first one that returns a Boolean true value.

Typedefs

Everything in Dart is an object; including functions that can be passed as arguments. However this approach has one caveat. If we define a variable to hold, for example a callback function called when a user posts a new message, we could do:

class ChatWebSocketsServer {
  var newMsgCallback;
  /* ... */
  void handleMessage(WebSocketsClient client, Map json) {
    /* ... */
    if (newMsgCallback != null) {
      newMsgCallback(record.name, record.text);
    }
  }
}

main(List<String> args) async {
  /* ... */
  wsServer.newMsgCallback = (String from, String msg) {
    print("$from: $msg");
  };
}

This is all right as far as we know that the callback function takes two String parameters. What could happen is that we might accidentally try to use a callback function that takes a different number or different types of parameters in both ChatWebSocketsServer class or main().

By defining var newMsgCallback, we lost track of the function definition. We could actually assign it any variable. That's why Dart has typedefs which is like a function definition prefixed with typedef keyword and without any body:

typedef void MessageCallback(String from, String msg);

class ChatWebSocketsServer {
  MessageCallback newMsgCallback;
  /* ... */
}

The newMsgCallback property has to have the same definition as MessageCallback typedef. Dart's type check can now warn you if you try to assign it anything else.

Using MySQL as a storage

As we want to store all chat messages in a MySQL database for further investigation, we'll add a dependency with a MySQL connector called sqljocky. Right now, it can only connect to MySQL via TCP; using Unix sockets doesn't work, so make sure you don't have the skip-networking option enabled in the MySQL configuration (it disables all TCP connections completely).

We'll keep it very simple and use just one table with three columns in a database called dart_chat. You can create the table structure with this SQL query:

CREATE TABLE 'chat_log' (
  'id' int(11) unsigned NOT NULL AUTO_INCREMENT,
  'name' varchar(255) NOT NULL DEFAULT '',
  'text' text NOT NULL,
  'created' datetime NOT NULL,
  PRIMARY KEY ('id')
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

In Dart, we'll connect to MySQL and pass the connection to ChatWebSocketsServer:

import 'package:sqljocky/sqljocky.dart';
main() {
  /* same as above */
  } else if (argResults['cmd'] == 'start') {
    ConnectionPool pool = new ConnectionPool(host: '127.0.0.1', 
      port: 3306, user: 'root', password: null, db: 'dart_chat'),
    wsServer = new ChatWebSocketsServer(pool);
    /* same as above */
  }
}

Then we will update the ChatWebSocketsServer class:

ChatWebSocketsServer {
  /* ... */
  Query _preparedInsertQuery;

  ChatWebSocketsServer(ConnectionPool pool) {
    _chat = new ListQueue<ChatMessage>();
    
    // The same like calling new Future(() {});
    (() async {
      // Preload chat history asynchronously.
      String selectSql = """
          SELECT * FROM chat_log ORDER BY created DESC LIMIT 0,100
      """;
      var results = await pool.query(selectSql);
      results.forEach((Row row) {
        var msg = new ChatMessage(
            row.name.toString(), row.text.toString());
        _chat.addFirst(msg);
      }
      
      String insertSql = """
      INSERT INTO chat_log (name,text,created) VALUES (?,?,NOW())""";
      _preparedInsertQuery = await pool.prepare(insertSql);
    })();
  }
  /* the rest remains unchanged */
}

In the constructor, we made one query that matches the 100 latest chat records and prepopulates the chat history. Then, we used pool.prepare() to create a so-called prepared query, which serves as a template that we can reuse multiple times by just executing it with a list of parameters.

Note

You should never insert variables directly into SQL queries and always use prepared queries to avoid SQL injection attacks (http://en.wikipedia.org/wiki/SQL_injection).

The last thing is to execute a prepared statement when a client sends a new message:

void handleMessage(WebSocketsClient client, Map json) {
  /* same as above */
  } else if (json['type'] == 'post' && client.name.isNotEmpty) {
    /* same as above */
    _preparedInsertQuery.execute([record.name, record.text]);
  } else if (json['type'] == 'init') { /* ... */ }
}

We don't even have to work with the database connection outside the constructor of ChatWebSocketsServer. The _preparedInsertQuery.execute() method makes an asynchronous call but we don't need to wait until it finishes and broadcast the message to all clients in the meantime. A few persisted records might look like this:

Using MySQL as a storage

Apart from what we used here, sqljocky can also run a prepared query multiple times in one call with Query.executeMulti(). In our case, it could be:

Query.executeMulti([['name1', 'text1'], ['n2', 't2'], [...]]);

This actually runs a single query for each list. Sometimes it's better to run multiple queries in a single transaction for performance reasons:

var trans = await pool.startTransaction();
await trans.query('...'),
await trans.query('...'),
await trans.query('...'),
await trans.commit();

Basically, every operation with the database is asynchronous, so we'll use the await keyword a lot (just imagine how awful this would be with nested callbacks).

Finally, when we're done with using the database, we should close it with pool.close(), which, in contrast to the most close methods, doesn't return a Future object.

We're using just MySQL here, but you can find connectors to probably all common database engines available today at https://pub.dartlang.org. It's worth noting that this connector is relatively new and its functionality is very simple. It's incomparably more primitive than projects such as SQLAlchemy for Python or Doctrine 2 for PHP.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.212.71