Creating Tokio codecs

In Tokio, we have the concept of a codec. A codec is a type that divides a slice of bytes into frames. Each frame will contain certain information parsed from the stream of bytes. In our case, we will read the input of the TCP connection and divide it into chunks each time we find the a letter. A production-ready codec will probably be more complex, but this example will give us a good enough base to implement our own codecs. We will need to implement two traits from the tokio-io crate, so we will need to add it to the [dependencies] section of our Cargo.toml file and import it with extern crate tokio_io;. We will need to do the same with the bytes crate. Now, let's start writing the code:

extern crate bytes;
extern crate tokio;
extern crate tokio_io;

use std::io;

use tokio_io::codec::{Decoder, Encoder};
use bytes::BytesMut;

#[derive(Debug, Default)]
struct ADividerCodec {
next_index: usize,
}

impl Decoder for ADividerCodec {
type Item = String;
type Error = io::Error;

fn decode(&mut self, buf: &mut BytesMut)
-> Result<Option<Self::Item>, Self::Error> {
if let Some(new_offset) =
buf[self.next_index..].iter().position(|b| *b == b'a') {
let new_index = new_offset + self.next_index;
let res = buf.split_to(new_index + 1);
let res = &res[..res.len() - 1];
let res: Vec<_> = res.into_iter()
.cloned()
.filter(|b| *b != b' ' && *b != b' ')
.collect();
let res = String::from_utf8(res).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"Unable to decode input as UTF8"
)
})?;
self.next_index = 0;
Ok(Some(res))
} else {
self.next_index = buf.len();
Ok(None)
}
}

fn decode_eof(&mut self, buf: &mut BytesMut)
-> Result<Option<String>, io::Error> {
Ok(match self.decode(buf)? {
Some(frame) => Some(frame),
None => {
// No terminating 'a' - return remaining data, if any
if buf.is_empty() {
None
} else {
let res = buf.take();
let res: Vec<_> = res.into_iter()
.filter(|b| *b != b' ' && *b != b' ')
.collect();
let res = String::from_utf8(res).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"Unable to decode input as UTF8"
)
})?;
self.next_index = 0;
Some(res)
}
}
})
}
}

This is a lot of code; let's analyse it carefully. We created a structure, named ADividerCodec, and we implemented the Decode trait for it. This code has two methods. The first and most important one is the decode() method. It receives a buffer containing data coming from the connection and it needs to return either some data or none. In this case, it will try to find the position of the a letter, in lower case. If it finds it, it will return all the bytes that were read until then. It also removes new lines, just to make the printing more clear.

It creates a string with those bytes, so it will fail if we send non-UTF-8 bytes. Once we take bytes from the front of the buffer, the next index should point to the first element in the buffer. If there was no a in the buffer, it will just update the index to the last element that was read, and just return None, since there isn't a full frame ready. The decode_eof() method will do a similar thing when the connection gets closed. We use strings as the output of the codec, but you can use any structure or enumeration to represent your data or commands, for example.

We also need to implement the Encode trait so that we can use the framed() method from Tokio. This just represents how the data would be encoded in a new byte array if we wanted to use bytes again. We will just get the bytes of the strings and append an a to it. We will lose new line information, though. Let's see what it looks like:

impl Encoder for ADividerCodec {
type Item = String;
type Error = io::Error;

fn encode(&mut self, chunk: Self::Item, buf: &mut BytesMut)
-> Result<(), io::Error> {
use bytes::BufMut;

buf.reserve(chunk.len() + 1);
buf.put(chunk);
buf.put_u8(b'a'),
Ok(())
}
}

To see how it works, let's implement a simple main() function and use Telnet to send some text with a letters in it:

use tokio::prelude::*;
use tokio::net::TcpListener;

fn main() {
let address = "127.0.0.1:8000".parse().unwrap();
let listener = TcpListener::bind(&address).unwrap();

let server = listener
.incoming()
.map_err(|e| eprintln!("Error accepting connection: {:?}", e))
.for_each(|socket| {
tokio::spawn(
socket
.framed(ADividerCodec::default())
.for_each(|chunk| {
println!("{}", chunk);
Ok(())
})
.map_err(|e| eprintln!("Error: {:?}", e)),
)
});

println!("Running Tokio server...");
tokio::run(server);
}

We could send this text, for example:

The output in the server will be similar to this:

Note that I didn't close the connection, so the last part of the last sentence was still in the buffer.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.87.161