Asynchronous I/O in Rust

When it comes to I/O operations, there is a go-to crate. It's called tokio, and it handles asynchronous input and output operations seamlessly. This crate is based in MIO. MIO, from Metal IO, is a base crate that provides a really low-level interface to asynchronous programming. It generates an event queue, and you can use a loop to gather all the events one by one, asynchronously.

As we saw earlier, these events can be anything from a TCP message was received to the file you requested is partially ready. There are tutorials to create small TCP servers in MIO, for example, but the idea of MIO is not using the crate directly, but using a facade. The most known and useful facade is the tokio crate. This crate, by itself, only gives you some small primitives, but it opens the doors to many asynchronous interfaces. You have, for example, tokio-serial, tokio-jsonrpc, tokio-http2, tokio-imap, and many, many more.

Not only that, you have also utilities such as tokio-retry that will automatically retry the I/O operation if an error happens. Tokio is really easy to use, it has an extremely low footprint, and it enables you to create incredibly fast services with its asynchronous operations. As you probably have already noticed, it is mostly centred around communication. This is due to all the helpers and capabilities it provides for these cases. The core crate also has file reading capabilities, so you should be covered for any I/O-bound operation, as we will see.

We will see first how to develop a small TCP echo server using Tokio. You can find similar tutorials on the Tokio website (https://tokio.rs/), and it is worthwhile to follow all of them. Let's therefore start by adding tokio as a dependency to the Cargo.toml file. Then, we will use the TcpListener from the tokio crate to create a small server. This structure binds a TCP socket listener to a given address, and it will asynchronously execute a given function for each of the incoming connections. In that function, we will asynchronously read any potential data that we could find in the socket and return it, doing an echo. Let's see what it looks like:

extern crate tokio;

use tokio::prelude::*;
use tokio::net::TcpListener;
use tokio::io;

fn main() {
let address = "127.0.0.1:8000".parse().unwrap();
let listener = TcpListener::bind(&address).unwrap();

let server = listener
.incoming()
.map_err(|e| eprintln!("Error accepting connection: {:?}", e))
.for_each(|socket| {
let (reader, writer) = socket.split();
let copied = io::copy(reader, writer);

let handler = copied
.map(|(count, _reader, _writer)| println!("{} bytes
received", count))
.map_err(|e| eprintln!("Error: {:?}", e));

tokio::spawn(handler)
});

tokio::run(server);
}

Let's analyze the code. The listener creates an asynchronous stream of incoming connections with the incoming() method. For each of them, we check whether it was an error and print a message accordingly, and then, for the correct ones, we get the socket and get a writer and a reader by using the split() method. Then, Tokio gives us a Copy future that gets created with the tokio::io::copy() function. This future represents data that gets copied from a reader to a writer asynchronously.

We could have written that future ourselves by using the AsyncRead and AsyncWrite traits, but it's great to see that Tokio already has that example future. Since the behavior we want is to return back whatever the connection was sending, this will work perfectly. We then add some extra code that will be executed after the reader returns End of File or EOF (when the connection gets closed). It will just print the number of bytes that were copied, and it will handle any potential errors that may appear.

Then, in order for the future to perform its task, something needs to execute it. This is where Tokio executors come in—we call tokio::spawn(), which will execute the future in the default executor. What we just created is a stream of things to do when a connection comes, but we now need to actually run the code. For that, Tokio has the tokio::run() function, which starts the whole Tokio runtime process and starts accepting connections.

The main future we created, the stream of incoming connections, will be executed at that point and will block the main thread. Since the server is always waiting for new connections, it will just block indefinitely. Still, this does not mean that the execution of the futures is synchronous. The thread will go idle without consuming CPU, and when a connection comes, the thread will be awakened and the future executed. In the future itself, while sending the received data back, it will not block the execution if there is no more data. This enables the running of many connections in only one thread. In a production environment, you will probably want to have similar behavior in multiple threads, so that each thread can handle multiple connections.

It's now time to test it. You can start the server by running cargo run and you can connect to it with a TCP tool such as Telnet. In the case of Telnet, it buffers the sent data line by line, so you will need to send a whole line to receive the echo back. There is another area where Tokio is especially useful—parsing frames. If you want to create your own communication protocol, for example, you may want to get chunks of those TCP bytes as frames, and then convert them to your type of data.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.89.18