WebSockets in Rust

If you work in web development, you know that WebSockets are one of the most useful protocols to speed up communication with the client. Using them allows your server to send information to the client without the latter requesting it, therefore avoiding one extra request. Rust has a great crate that allows the implementation of WebSockets, named websocket.

We will analyze a small, asynchronous WebSocket echo server example to see how it works. We will need to add websocket, futures, and tokio-core to the [dependencies] section of our Cargo.toml file. The following example has been retrieved and adapted from the asynchronous server example in the websocket crate. It uses the Tokio reactor core, which means that it requires a core object and its handle. The WebSocket requires this behavior since it's not a simple I/O operation, which means that it requires some wrappers, such as connection upgrades to WebSockets. Let's see how it works:

extern crate futures;
extern crate tokio_core;
extern crate websocket;

use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::async::Server;

use tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};

fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let server = Server::bind("127.0.0.1:2794", &handle).unwrap();

let task = server
.incoming()
.map_err(|InvalidConnection { error, .. }| error)
.for_each(|(upgrade, addr)| {
println!("Got a connection from: {}", addr);

if !upgrade.protocols().iter().any(|s| s == "rust-websocket") {
handle.spawn(
upgrade
.reject()
.map_err(|e| println!("Error: '{:?}'", e))
.map(|_| {}),
);
return Ok(());
}

let fut = upgrade
.use_protocol("rust-websocket")
.accept()
.and_then(|(client, _)| {
let (sink, stream) = client.split();

stream
.take_while(|m| Ok(!m.is_close()))
.filter_map(|m| match m {
OwnedMessage::Ping(p) => {
Some(OwnedMessage::Pong(p))
}
OwnedMessage::Pong(_) => None,
_ => Some(m),
})
.forward(sink)
.and_then(|(_, sink)| {
sink.send(OwnedMessage::Close(None))
})
});

handle.spawn(
fut.map_err(|e| {
println!("Error: {:?}", e)
}).map(|_| {}));
Ok(())
});

core.run(task).unwrap();
}

Most of the code, as you can see, is really similar to the code used in the previous examples. The first change that we see is that for each connection, before actually accepting the connection, it will check if the socket can be upgraded to the rust-websocket protocol. Then, it will upgrade the connection protocol to that protocol and accept the connection. For each connection, it will receive a handle to the client and some headers. All this is done asynchronously, of course.

We discard the headers, and we divide the client into a sink and a stream. A sink is the asynchronous equivalent to a synchronous writer, in futures terminology. It starts taking bytes from the stream until it closes, and, for each of them, it replies with the same message. It will then call the forward() method, which consumes all the messages in the stream, and then it sends a connection closed message. The future we just created is then spawned using the handle we took from the core. This means that, for each connection, this whole future will be run. The Tokio core then runs the whole server task.

If you get the example client implementation from the crate's Git repository (https://github.com/cyderize/rust-websocket/blob/master/examples/async-client.rs), you will be able to see how the server replies to whatever the client sends. Once you understand this code, you will be able to create any WebSocket server you need.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.217.95