WebSockets is a protocol for bidirectional communication between a web browser and a server. The fact that a server can actively send you messages is the most important thing and is probably the only reason why you might want to use WebSockets in the future. There used to be some fallbacks for browsers that don't support WebSockets, such as long pooling, but we're not going to use any of them and assume that our browsers are up to date with proper WebSockets support.
Our chat WebSockets server spawns an HTTP server that listens to WebSockets headers and implements a handshake between the two endpoints to switch to the WebSockets protocol (the so-called upgrade request).
Later, we'll log all chat messages in a MySQL database.
So first, we'll create an HTTP server that implements "upgrade requests" and a class that holds all our active WebSockets connections. All we need to write HTTP servers is part of the dart:async
package:
// bin/server.dart import 'package:args/args.dart'; import 'dart:io'; import 'dart:async'; import 'dart:convert'; import 'dart:collection'; // Class representing a client connected to the chat server. class WebSocketsClient { String name; WebSocket ws; WebSocketsClient(this.ws); Future close() => ws.close(); } class ChatWebSocketsServer { // Hold all connected clients in this list. List<WebSocketsClient> _clients = []; // Add a new client based on his WebSocket connection. void handleWebSocket(WebSocket ws) { WebSocketsClient client = new WebSocketsClient(ws); _clients.add(client); print('Client connected'), // In a real app we would probably wrap JSON.decode() with // try & catch to filter out malformed inputs. // Stream.map() returns a new Stream and processes each item // with callback function. In our case it decodes all JSONs. // onDone is called when the connection is closed by client. ws.map((string) => JSON.decode(string)) .listen((Map json) { handleMessage(client, json); }, onDone: () => close(client)); } // Handle incoming messages. void handleMessage(WebSocketsClient client, Map json) { /* ... */ } // Client closed their connection. void close(WebSocketsClient client) { /* ... */ } } main(List<String> args) { // Args parser configuration as above. HttpServer httpServer; if (argResults['help']) { /* ... */ } else if (argResults['cmd'] == 'stop') { /* ... */ } else if (argResults['cmd'] == 'start') { ChatWebSocketsServer wsServer = new ChatWebSocketsServer(); print('My PID: $pid'), int port = int.parse(argResults['port']); print('Starting WebSocket server'), var address = InternetAddress.LOOPBACK_IP_V4; HttpServer httpServer = await HttpServer.bind(address, port); StreamController sc = new StreamController(); sc.stream.transform(new WebSocketTransformer()) .listen((WebSocket ws) { wsServer.handleWebSocket(ws); }); // Listen to HTTP requests. httpServer.listen((HttpRequest request) { // You can also handle different URL with request.uri. // if (request.uri == '/ws') { } sc.add(request); }); } }
This is our app in a nutshell. We use StreamController
and WebSocketTransformer
classes to upgrade the client's HTTP request to the WebSockets connection.
What's interesting here is the handleWebSocket()
method. Notice in particular how we're using closures with the client
variable. Both handleMessage()
and close()
are in a closure and have their own reference to client
, so even with multiple calls to handleWebSocket()
, we can still easily identify which client invoked each event.
The handleMessage()
method is the core of the entire server app and it's called for every incoming message from all clients. Our app will handle three types of messages:
We can finish the server part with all we need for now:
// bin/server.dart // A single item in the chat history. class ChatMessage { String name; String text; ChatMessage(this.name, this.text); String get json => JSON.encode({'name': name, 'text': text}); } class ChatWebSocketsServer { List<WebSocketsClient> _clients = []; // Chat history. ListQueue<ChatMessage> _chat; static const int maxHistory = 100; ChatWebSocketsServer() { _chat = new ListQueue<ChatMessage>(); } void handleWebSocket(WebSocket ws) { /* same as above */ } // Handle incoming messages. void handleMessage(WebSocketsClient client, Map json) { if (json['type'] == 'change_name') { client.name = json['name']; } else if (json['type'] == 'post' && client.name.isNotEmpty) { ChatMessage record = new ChatMessage(client.name, json['text']); // Keep only last maxHistory messages. _chat.addLast(record); if (_chat.length > maxHistory) _chat.removeFirst(); broadcast(record); } else if (json['type'] == 'init') { // Send chat history to the client. // Let's not bother with performance for simplicity reasons. _chat.forEach((ChatMessage m) => client.ws.add(m.json)); } } // Client closed their connection. void close(WebSocketsClient client) { print('Client disconnected'), // Remove the reference from the list of clients. client.close().then((_) => _clients.removeAt(_indexByWebSocket(client.ws))); } // Close connection to all clients. // This is used only on server shutdown. Future closeAll() { // Make a list of all Future objects returned by close(). List<Future> futures = []; _clients.forEach((client) => futures.add(client.ws.close())); // ... and wait until all of them complete. return Future.wait(futures); } // Send a message to all connected clients. void broadcast(ChatMessage message) { // Method add() sends a string to the client. _clients.forEach((client) => client.ws.add(message.json)); } // Get index for this connection from the list of clients. int _indexByWebSocket(WebSocket ws) { for (int i = 0; i < _clients.length; i++) { if (_clients[i].ws == ws) return i; } return null; } }
We're using ListQueue<T>
here to store chat messages because it has the removeFirst()
and addFirst()
methods with constant time complexity.
Although the code is quite long, it's not hard to understand. Note the broadcast()
method that sends a message to all connected clients because we have the WebSocket
connections for all of them. When a client disconnects, we call its close()
method and remove it from the list of clients.
The HTML and Dart code for a web browser using WebSocket is simple as well:
<!-- web/index.html --> <body> <p>Name: <input type="text" id="name"></p> <div id="chat"></div> <p>Message: <input type="text" id="msg"> <button id="btn-send">Send</button></p> <script type="application/dart" src="main.dart"></script> </body>
There are just two input fields and a button:
// web/main.dart import 'dart:html'; import 'dart:convert'; main() { WebSocket ws = new WebSocket('ws://127.0.0.1:8888'), ws.onOpen.listen((e) { print('Connected'), ws.sendString(JSON.encode({'type': 'init'})); }); // Automatically decode all incoming messages. ws.onMessage.map((MessageEvent e) => JSON.decode(e.data)).listen((Map j) { var a = '<p><b>${j['name']}</b>: ${j['text']}</p>'; querySelector('#chat').appendHtml(s); }); querySelector('#name').onKeyUp.listen((e) { ws.sendString(JSON.encode({ 'type': 'change_name', 'name': e.target.value })); }); querySelector('#btn-send').onClick.listen((e) { InputElement input = querySelector('#msg'), ws.sendString(JSON.encode( { 'type': 'post', 'text': input.value })); input.value = ''; }); }
Both server and browser are using the WebSocket
class, but they aren't the same. The browser's WebSocket
comes from the dart:html
package, while the WebSocket
used in the server is from dart:io
. This is why we're sending data with both sendString()
and add()
methods.
The WebSocket
class in the browser has a couple of streams. The most interesting are onOpen
and onMessage
. We used onOpen
to request the entire chat history from the server. Then, onMessage
decoded all the data sent from the server and because we're expecting just chat messages, we appended a simple HTML to the main container.
To test the server, we can run it without any parameters:
$ dart bin/server.dart My PID: 15725 Starting WebSocket server
Then, open multiple tabs, with index.html
, and try to set different names and write a few messages. The messages should immediately appear in all tabs. In Developer Tools, we can also see both sent and received messages from the server via WebSockets:
If we wanted to run this server in a real-life application, we would ideally run it in the background, disassociate from the terminal with no output, and so on. We can do this with the nohup
utility that prevents a process from terminating, even when the parent process terminates by ignoring the SIGHUP
signal. Note that this isn't the same as running a daemon process. Daemons have to follow a predefined behavior, such as closing all file descriptors, changing the current directory, redirecting output, and more.
Before we do this, we need to be able to stop the server when it's running in the background. We could use ps | grep server.dart
, but there could be more processes with this name running at the same time and we may not know which one is the one we want to terminate.
We should create a well-behaved process that listens to termination signals, such as SIGTERM
and SIGINT
, to properly close all WebSocket connections and the HTTP server.
For this reason, we'll save our server's process ID (PID) into a file on launch and listen to the two signals. Then, running dart bin
/server.dart -c stop
will read the PID and use Process.kill()
to send a signal to the currently running server process:
main(List<String> args) async { /* the rest is unchanged */ } else if (argResults['cmd'] == 'stop') { // stop process by sending a SIGTERM signal String pidFile = argResults['pid-file']; String pid = await (new File(pidFile).readAsString()); Process.killPid(int.parse(pid), ProcessSignal.SIGTERM); } else if (argResults['cmd'] == 'start') { /* Create HTTP server and so on... */ // Save process PID to a file. String path = argResults['pid-file']; File file = await (new File(path)).create(recursive: true); await file.writeAsString(pid.toString()); void shutdown(ProcessSignal signal) { print('Received signal $signal'), // Remove the file with PID. pool.close(); // At this point it's better not to use await because // we can call all tasks asynchronously and just wait until // all of them are finished. await Future.wait([ new File(argResults['pid-file']).delete(), httpServer.close(), wsServer.closeAll(), ]); // End this process now with code 0. exit(0); } ProcessSignal.SIGTERM.watch().listen(shutdown); ProcessSignal.SIGINT.watch().listen(shutdown); } }
We used the File
class to create both a new file and a directory with create(recursive: true)
. Both File
and Directory
classes are based heavily on Future
objects. However, some methods have synchronous and asynchronous implementations (for example, there are delete()
and deleteSync()
methods).
To stop the server, we run server.dart
again, which sends a signal to the PID of the process that's currently running the server.
Now, we can run server.dart
again:
$ nohup dart bin/server.dart > /dev/null 2>&1 &
Using > /dev/null
redirects all stdout
to /dev/null
(discarding all output), 2>&1
redirects stderr
to stdout
, and finally, &
runs the process in the background.
We don't even care what a server's PID is because it's saved in a file and is automatically read and killed with:
$ dart bin/server.dart -c stop
The server process gently closes all connections and terminates.
Dart 1.9 brought another new keyword called enum
to define a fixed set of values. In our example from the preceding section, we could define fixed number of commands as:
enum Command { start, stop }
Later in the code, we could check which command we want to use without comparing strings, as we did in the preceding example, but rather compare Command
enums like this:
cmd = Command.start; if (cmd == Command.stop) { /* ... */ }
We can update our main()
method and convert a string argResults['cmd']
command into a Command
enum value:
Command cmd = Command.values.firstWhere((Command c) { String cmdString = c.toString().split('.')[1].toLowerCase(); return cmdString == argResults['cmd'].toLowerCase(); }); // Then follow with ifs to handle each command. if (cmd == Command.stop) { // ... } else if (cmd == Command.start) { // ... }
The Command.values
property is a constant that contains a List
object with all possible values for this enum. Calling c.toString()
returns a string representation of an enum's value which in our case are Command.start
and Command.stop
. The List.firstWhere()
method iterates all items in the list until the first one that returns a Boolean true
value.
Everything in Dart is an object; including functions that can be passed as arguments. However this approach has one caveat. If we define a variable to hold, for example a callback function called when a user posts a new message, we could do:
class ChatWebSocketsServer { var newMsgCallback; /* ... */ void handleMessage(WebSocketsClient client, Map json) { /* ... */ if (newMsgCallback != null) { newMsgCallback(record.name, record.text); } } } main(List<String> args) async { /* ... */ wsServer.newMsgCallback = (String from, String msg) { print("$from: $msg"); }; }
This is all right as far as we know that the callback function takes two String
parameters. What could happen is that we might accidentally try to use a callback function that takes a different number or different types of parameters in both ChatWebSocketsServer
class or main()
.
By defining var newMsgCallback
, we lost track of the function definition. We could actually assign it any variable. That's why Dart has typedefs which is like a function definition prefixed with typedef
keyword and without any body:
typedef void MessageCallback(String from, String msg); class ChatWebSocketsServer { MessageCallback newMsgCallback; /* ... */ }
The newMsgCallback
property has to have the same definition as MessageCallback
typedef. Dart's type check can now warn you if you try to assign it anything else.
As we want to store all chat messages in a MySQL database for further investigation, we'll add a dependency with a MySQL connector called sqljocky
. Right now, it can only connect to MySQL via TCP; using Unix sockets doesn't work, so make sure you don't have the skip-networking
option enabled in the MySQL configuration (it disables all TCP connections completely).
We'll keep it very simple and use just one table with three columns in a database called dart_chat
. You can create the table structure with this SQL query:
CREATE TABLE 'chat_log' ( 'id' int(11) unsigned NOT NULL AUTO_INCREMENT, 'name' varchar(255) NOT NULL DEFAULT '', 'text' text NOT NULL, 'created' datetime NOT NULL, PRIMARY KEY ('id') ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
In Dart, we'll connect to MySQL and pass the connection to ChatWebSocketsServer
:
import 'package:sqljocky/sqljocky.dart'; main() { /* same as above */ } else if (argResults['cmd'] == 'start') { ConnectionPool pool = new ConnectionPool(host: '127.0.0.1', port: 3306, user: 'root', password: null, db: 'dart_chat'), wsServer = new ChatWebSocketsServer(pool); /* same as above */ } }
Then we will update the ChatWebSocketsServer
class:
ChatWebSocketsServer { /* ... */ Query _preparedInsertQuery; ChatWebSocketsServer(ConnectionPool pool) { _chat = new ListQueue<ChatMessage>(); // The same like calling new Future(() {}); (() async { // Preload chat history asynchronously. String selectSql = """ SELECT * FROM chat_log ORDER BY created DESC LIMIT 0,100 """; var results = await pool.query(selectSql); results.forEach((Row row) { var msg = new ChatMessage( row.name.toString(), row.text.toString()); _chat.addFirst(msg); } String insertSql = """ INSERT INTO chat_log (name,text,created) VALUES (?,?,NOW())"""; _preparedInsertQuery = await pool.prepare(insertSql); })(); } /* the rest remains unchanged */ }
In the constructor, we made one query that matches the 100 latest chat records and prepopulates the chat history. Then, we used pool.prepare()
to create a so-called prepared query, which serves as a template that we can reuse multiple times by just executing it with a list of parameters.
You should never insert variables directly into SQL queries and always use prepared queries to avoid SQL injection attacks (http://en.wikipedia.org/wiki/SQL_injection).
The last thing is to execute a prepared statement when a client sends a new message:
void handleMessage(WebSocketsClient client, Map json) { /* same as above */ } else if (json['type'] == 'post' && client.name.isNotEmpty) { /* same as above */ _preparedInsertQuery.execute([record.name, record.text]); } else if (json['type'] == 'init') { /* ... */ } }
We don't even have to work with the database connection outside the constructor of ChatWebSocketsServer
. The _preparedInsertQuery.execute()
method makes an asynchronous call but we don't need to wait until it finishes and broadcast the message to all clients in the meantime. A few persisted records might look like this:
Apart from what we used here, sqljocky
can also run a prepared query multiple times in one call with Query.executeMulti()
. In our case, it could be:
Query.executeMulti([['name1', 'text1'], ['n2', 't2'], [...]]);
This actually runs a single query for each list. Sometimes it's better to run multiple queries in a single transaction for performance reasons:
var trans = await pool.startTransaction(); await trans.query('...'), await trans.query('...'), await trans.query('...'), await trans.commit();
Basically, every operation with the database is asynchronous, so we'll use the await
keyword a lot (just imagine how awful this would be with nested callbacks).
Finally, when we're done with using the database, we should close it with pool.close()
, which, in contrast to the most close methods, doesn't return a Future
object.
We're using just MySQL here, but you can find connectors to probably all common database engines available today at https://pub.dartlang.org. It's worth noting that this connector is relatively new and its functionality is very simple. It's incomparably more primitive than projects such as SQLAlchemy for Python or Doctrine 2 for PHP.
3.135.216.75