A telemetry server

Let's build a descriptive statistics server. These pop up often in one way or another inside organizations: a thing is needed that consumes events, does some kind of descriptive statistic computation over those things, and then multiplexes the descriptions out to other systems. The very reason my work project, postmates/cernan (https://crates.io/crates/cernan), exists is to service this need at scale on resource constrained devices without tying operations staff into any kind of pipeline. What we'll build here now is a kind of mini-cernan, something whose flow is as follows:

                                _--> high_filter --> cma_egress
                               /
  telemetry -> ingest_point ---
    (udp)                      \_--> low_filter --> ckms_egress

The idea is to take telemetry from a simple UDP protocol, receive it as quickly as possible to avoid the OS dumping packets, pass these points through a high and low filter, and then hand the filtered points off to two different statistical egress points. The egress associated with the high filter computes a continuous moving average, while the low filter associated egress computes a quantile summary using an approximation algorithm from the postmates/quantiles (https://crates.io/crates/quantiles) library.

Let's dig in. First, let's look at Cargo.toml for the project:

[package]
name = "telem"
version = "0.1.0"

[dependencies]
quantiles = "0.7"
seahash = "3.0"

[[bin]]
name = "telem"
doc = false

Short and to the point. We draw in quantiles, as mentioned previously, as well as seahasher. Seahasher is a particularly fast—but not cryptographically safe—hasher that we'll substitute into HashMap. More on that shortly. Our executable is broken out into src/bin/telem.rs since this project is a split library/binary setup:

extern crate telem;

use std::{thread, time};
use std::sync::mpsc;
use telem::IngestPoint;
use telem::egress::{CKMSEgress, CMAEgress, Egress};
use telem::event::Event;
use telem::filter::{Filter, HighFilter, LowFilter};

fn main() {
    let limit = 100;
    let (lp_ic_snd, lp_ic_rcv) = mpsc::channel::<Event>();
    let (hp_ic_snd, hp_ic_rcv) = mpsc::channel::<Event>();
    let (ckms_snd, ckms_rcv) = mpsc::channel::<Event>();
    let (cma_snd, cma_rcv) = mpsc::channel::<Event>();

    let filter_sends = vec![lp_ic_snd, hp_ic_snd];
    let ingest_filter_sends = filter_sends.clone();
    let _ingest_jh = thread::spawn(move || {
        IngestPoint::init("127.0.0.1".to_string(), 1990, 
ingest_filter_sends).run(); }); let _low_jh = thread::spawn(move || { let mut low_filter = LowFilter::new(limit); low_filter.run(lp_ic_rcv, vec![ckms_snd]); }); let _high_jh = thread::spawn(move || { let mut high_filter = HighFilter::new(limit); high_filter.run(hp_ic_rcv, vec![cma_snd]); }); let _ckms_egress_jh = thread::spawn(move || { CKMSEgress::new(0.01).run(ckms_rcv); }); let _cma_egress_jh = thread::spawn(move || { CMAEgress::new().run(cma_rcv); }); let one_second = time::Duration::from_millis(1_000); loop { for snd in &filter_sends { snd.send(Event::Flush).unwrap(); } thread::sleep(one_second); } }

There's a fair bit going on here. The first eight lines are imports of the library bits and pieces we need. The body of main is dominated by setting up our worker threads and feeding the appropriate channels into them. Note that some threads take multiple sender sides of a channel:

    let filter_sends = vec![lp_ic_snd, hp_ic_snd];
    let ingest_filter_sends = filter_sends.clone();
    let _ingest_jh = thread::spawn(move || {
        IngestPoint::init("127.0.0.1".to_string(), 1990, 
ingest_filter_sends).run(); });

That's how we do fanout using Rust MPSC. Let's take a look at IngestPoint, in fact. It's defined in src/ingest_point.rs:

use event;
use std::{net, thread};
use std::net::ToSocketAddrs;
use std::str;
use std::str::FromStr;
use std::sync::mpsc;
use util;

pub struct IngestPoint {
    host: String,
    port: u16,
    chans: Vec<mpsc::Sender<event::Event>>,
}

An IngestPoint is a host—either an IP address or a DNS hostname, per ToSocketAddrs—a port and a vector of mpsc::Sender<event::Event>. The inner type is something we've defined:

#[derive(Clone)]
pub enum Event {
    Telemetry(Telemetry),
    Flush,
}

#[derive(Debug, Clone)]
pub struct Telemetry {
    pub name: String,
    pub value: u32,
}

telem has two kinds of events that flow through it—Telemetry, which comes from IngresPoint and Flush, which comes from the main thread. Flush acts like a clock-tick for the system, allowing the individual subsystems of the project to keep track of time without making reference to the wall-clock. It's not uncommon in embedded programs to define time in terms of some well-known pulse and, when possible, I've tried to keep to that in parallel programming as well. If nothing else, it helps with testing to have time as an externally pushed attribute of the system. Anyhow, back to IngestPoint:

impl IngestPoint {
    pub fn init(
        host: String,
        port: u16,
        chans: Vec<mpsc::Sender<event::Event>>,
    ) -> IngestPoint {
        IngestPoint {
            chans: chans,
            host: host,
            port: port,
        }
    }

    pub fn run(&mut self) {
        let mut joins = Vec::new();

        let addrs = (self.host.as_str(), self.port).to_socket_addrs();
        if let Ok(ips) = addrs {
            let ips: Vec<_> = ips.collect();
            for addr in ips {
                let listener =
                    net::UdpSocket::bind(addr)
.expect("Unable to bind to UDP socket"); let chans = self.chans.clone(); joins.push(thread::spawn(move || handle_udp(chans,
&listener))); } } for jh in joins { jh.join().expect("Uh oh, child thread panicked!"); } } }

The first bit of this, init, is just setup. The run function calls to_socket_addrs on our host/port pair and retrieves all the associated IP addresses. Each of these addresses get a UdpSocket bound to them and an OS thread to listen for datagrams from that socket. This is wasteful in terms of thread overhead and later in this book we'll discuss evented-IO alternatives. Cernan, discussed previously, being a production system makes use of Mio in its Sources. The key function here is handle_udp, the function that gets passed to the new listener threads. It is as follows:

fn handle_udp(mut chans: Vec<mpsc::Sender<event::Event>>, 
socket: &net::UdpSocket) { let mut buf = vec![0; 16_250]; loop { let (len, _) = match socket.recv_from(&mut buf) { Ok(r) => r, Err(e) => {
panic!(
format!("Could not read UDP socket with
error {:?}", e)),
} }; if let Some(telem) =
parse_packet(str::from_utf8(&buf[..len]).unwrap()) { util::send(&mut chans, event::Event::Telemetry(telem)); } } }

The function is a simple infinite loop that pulls datagrams off the socket into a 16 KB buffer—comfortably larger than most datagrams—and then calls parse_packet on the result. If the datagram was a valid example of our as yet unspecified protocol, then we call util::send to send the Event::Telemetry out over the Sender<event::Event> in chans. util::send is little more than a for loop:

pub fn send(chans: &[mpsc::Sender<event::Event>], event: event::Event) {
    if chans.is_empty() {
        return;
    }

    for chan in chans.iter() {
        chan.send(event.clone()).unwrap();
    }
}

The ingest payload is nothing special: a name of non-whitespace characters followed by one or more whitespace characters followed by a u32, all string encoded and utf8 valid:

fn parse_packet(buf: &str) -> Option<event::Telemetry> {
    let mut iter = buf.split_whitespace();
    if let Some(name) = iter.next() {
        if let Some(val) = iter.next() {
            match u32::from_str(val) {
                Ok(int) => {
                    return Some(event::Telemetry {
                        name: name.to_string(),
                        value: int,
                    })
                }
                Err(_) => return None,
            };
        }
    }
    None
}

Popping out to the filters, both HighFilter and LowFilter are done in terms of a common Filter trait, defined in src/filter/mod.rs:

use event;
use std::sync::mpsc;
use util;

mod high_filter;
mod low_filter;

pub use self::high_filter::*;
pub use self::low_filter::*;

pub trait Filter {
    fn process(
        &mut self,
        event: event::Telemetry,
        res: &mut Vec<event::Telemetry>,
    ) -> ();

    fn run(
        &mut self,
        recv: mpsc::Receiver<event::Event>,
        chans: Vec<mpsc::Sender<event::Event>>,
    ) {
        let mut telems = Vec::with_capacity(64);
        for event in recv.into_iter() {
            match event {
                event::Event::Flush => util::send(&chans, 
event::Event::Flush), event::Event::Telemetry(telem) => { self.process(telem, &mut telems); for telem in telems.drain(..) { util::send(&chans,
event::Event::Telemetry(telem)) } } } } } }

Any implementing filter is responsible for providing their own process. It's this function that the default run calls when an Event is pulled from the Receiver<Event> and found to be a Telemetry. Though neither high nor low filters make use of it, the process function is able to inject new Telemetry into the stream if it's programmed to do so by pushing more onto the passed telems vector. That's how cernan's programmable filter is able to allow end users to create telemetry from Lua scripts. Also, why pass telems rather than have the process return a vector of Telemetry? It avoids continual small allocations. Depending on the system, allocations will not necessarily be uncoordinated between threads—meaning high-load situations can suffer from mysterious pauses—and so it's good style to avoid them where possible if the code isn't twisted into some weird version of itself by taking such care.

Both low and high filters are basically the same. The low filter passes a point through itself if the point is less than or equal to a pre-defined limit, where the high filter is greater than or equal to it. Here's LowFilter, defined in src/filter/low_filter.rs:

use event;
use filter::Filter;

pub struct LowFilter {
    limit: u32,
}

impl LowFilter {
    pub fn new(limit: u32) -> Self {
        LowFilter { limit: limit }
    }
}

impl Filter for LowFilter {
    fn process(
        &mut self,
        event: event::Telemetry,
        res: &mut Vec<event::Telemetry>,
    ) -> () {
        if event.value <= self.limit {
            res.push(event);
        }
    }
}

Egress of telemetry is defined similarly to the way filter is done, split out into a sub-module and a common trait. The trait is present in src/egress/mod.rs:

use event;
use std::sync::mpsc;

mod cma_egress;
mod ckms_egress;

pub use self::ckms_egress::*;
pub use self::cma_egress::*;

pub trait Egress {
    fn deliver(&mut self, event: event::Telemetry) -> ();

    fn report(&mut self) -> ();

    fn run(&mut self, recv: mpsc::Receiver<event::Event>) {
        for event in recv.into_iter() {
            match event {
                event::Event::Telemetry(telem) => self.deliver(telem),
                event::Event::Flush => self.report(),
            }
        }
    }
}

The deliver function is intended to give the egress its Telemetry for storage. The report is intended to force the implementors of Egress to issue their summarized telemetry to the outside world. Both of our Egress implementors—CKMSEgress and CMAEgress—merely print their information but you can well imagine an Egress that emits its information out over some network protocol to a remote system. This is, in fact, exactly what cernan's Sinks do, across many protocols and transports. Let's look at a single egress, as they're both very similar. CKMSEgress is defined in src/egress/ckms_egress.rs:

use egress::Egress;
use event;
use quantiles;
use util;

pub struct CKMSEgress {
    error: f64,
    data: util::HashMap<String, quantiles::ckms::CKMS<u32>>,
    new_data_since_last_report: bool,
}

impl Egress for CKMSEgress {
    fn deliver(&mut self, event: event::Telemetry) -> () {
        self.new_data_since_last_report = true;
        let val = event.value;
        let ckms = self.data
            .entry(event.name)
            .or_insert(quantiles::ckms::CKMS::new(self.error));
        ckms.insert(val);
    }

    fn report(&mut self) -> () {
        if self.new_data_since_last_report {
            for (k, v) in &self.data {
                for q in &[0.0, 0.25, 0.5, 0.75, 0.9, 0.99] {
                    println!("[CKMS] {} {}:{}", k, q, 
v.query(*q).unwrap().1); } } self.new_data_since_last_report = false; } } } impl CKMSEgress { pub fn new(error: f64) -> Self { CKMSEgress { error: error, data: Default::default(), new_data_since_last_report: false, } } }

Note that data: util::HashMap<String, quantiles::ckms::CKMS<u32>>. This util::HashMap is a type alias for std::collections::HashMap<K, V, hash::BuildHasherDefault<SeaHasher>>, as mentioned previously. The cryptographic security of hashing here is less important than the speed of hashing, which is why we go with SeaHasher. There are a great many alternative hashers available in crates and it's a fancy trick to be able to swap them out for your use case. quantiles::ckms::CKMS is an approximate data structure, defined in Effective Computation of Biased Quantiles Over Data Streams by Cormode et al. Many summary systems run in limited space but are willing to tolerate errors. The CKMS data structure allows for point shedding while keeping guaranteed error bounds on the quantile approximations. The discussion of the data structure is outside the domain of this book but the implementation is interesting and the paper is remarkably well-written. Anyhow, that's what the error setting is all about. If you flip back to the main function, note that we hard-code the error as being 0.01, or, any quantile summary is guaranteed to be off true within 0.01.

That, honestly, is pretty much it. We've just stepped through the majority of a non-trivial Rust program built around the MPSC abstraction provided in the standard library. Let's fiddle with it some. In one shell, start telem:

> cargo run --release
   Compiling quantiles v0.7.0
   Compiling seahash v3.0.5
   Compiling telem v0.1.0 (file:///Users/blt/projects/us/troutwine/concurrency_in_rust/external_projects/telem)
    Finished release [optimized] target(s) in 7.16 secs
     Running `target/release/telem`

In another shell, start sending UDP packets. On macOS, you can use nc like so:

> echo "a 10" | nc -c -u 127.0.0.1 1990
> echo "a 55" | nc -c -u 127.0.0.1 1990

The call is similar on Linux; you just have to be careful not to wait for a response is all. In the original shell, you should see an output like this after a second:

[CKMS] a 0:10
[CKMS] a 0.25:10
[CKMS] a 0.5:10
[CKMS] a 0.75:10
[CKMS] a 0.9:10
[CKMS] a 0.99:10
[CKMS] a 0:10
[CKMS] a 0.25:10
[CKMS] a 0.5:10
[CKMS] a 0.75:55
[CKMS] a 0.9:55
[CKMS] a 0.99:55

The points, being below the limit, have gone through the low filter and into the CKMS egress. Back to our other shell:

> echo "b 1000" | nc -c -u 127.0.0.1 1990
> echo "b 2000" | nc -c -u 127.0.0.1 1990
> echo "b 3000" | nc -c -u 127.0.0.1 1990

In the telem shell:

[CMA] b 1000
[CMA] b 1500
[CMA] b 2000

Bang, just as expected. The points, being above the limit, have gone through the high filter and into the CMA egress. So long as no points are coming in, the telem should be drawing almost no CPU, waking up each of the threads once every second or so for the Flush pulse. Memory consumption will also be very low, being primarily represented by the large input buffer in IngestPoint.

There are problems with this program. In many places, we explicitly panic or unwrap on potential problem points, rather than deal with the issues. While it's reasonable to unwrap in a production program, you should be very sure that the error you're blowing your program up for cannot be otherwise dealt with. Most concerning is that the program has no concept of back-pressure. If IngestPoint were able to produce points faster than the filters or the egresses, they could absorb the channels between threads that would keep allocating space. A slight rewrite of the program to use mpsc::SyncSender would be suitable—back-pressure is applied as soon as a channel is filled to capacity—but only if dropping points is acceptable. Given that the ingest protocol is in terms of UDP it almost surely is, but the reader can imagine a scenario where it would not be. Rust's standard library struggles in areas where alternative forms of back-pressure are needed but these are, admittedly, esoteric. If you're interested in reading through a production program that works along the lines of telem but has none of the defects identified here, I warmly recommend the cernan codebase.

All in all, Rust's MPSC is a very useful tool when constructing parallel systems. In this chapter, we built our own buffered channel, Ring, but that isn't common at all. You'd need a fairly specialized use case to consider not using the standard library's MPSC for intra-thread channel-based communication. We'll examine one such use case in the next chapter after we cover more of Rust's basic concurrency primitives.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.140.188.244