The Ethernet sniffer

Of equal importance to understanding a technique in-depth is understanding when not to apply it. Let's consider another thread-per-unit-of-work system, but this time we'll be echoing Ethernet packets rather than lines received over a TCP socket. Our project's Cargo.toml:

[package]
name = "sniffer"
version = "0.1.0"
authors = ["Brian L. Troutwine <[email protected]>"]

[dependencies]
pnet = "0.21"

[[bin]]
name = "sniffer"

[[bin]]
name = "poor_threading"

Like the TCP example, we'll create two binaries, one that is susceptible to thread overload–poor_threadingand another–sniffer–that is not. The premise here is that we want to sniff a network interface for Ethernet packets, reverse the source and destination headers on that packet, and send the modified packet back to the original source. Simultaneously, we'll collect summary statistics of the packets we collect. On a saturated network, our little programs will be very busy and there will have to be trade-offs made somewhere in terms of packet loss and receipt.

The only dependency we're pulling in here is pnet. libpnet is a low-level packet manipulation library, useful for building network utilities or for prototyping transport protocols. I think it's pretty fun to fuzz-test transport implementations with libpnet on commodity network devices. Mostly, though, the reward is a crashed home router, but I find that amusing. Anyhow, let's look into poor_threading. It's preamble is fairly ho-hum:

extern crate pnet;

use pnet::datalink::Channel::Ethernet;
use pnet::datalink::{self, DataLinkReceiver, DataLinkSender, 
MacAddr, NetworkInterface}; use pnet::packet::ethernet::{EtherType, EthernetPacket,
MutableEthernetPacket}; use pnet::packet::{MutablePacket, Packet}; use std::collections::HashMap; use std::sync::mpsc; use std::{env, thread};

We pull in a fair bit of pnet's facilities, which we'll discuss in due time. We don't pull in clap or similar argument-parsing libraries, instead requiring that the user pass in the interface name as the first argument to the program:

fn main() {
    let interface_name = env::args().nth(1).unwrap();
    let interface_names_match = |iface: &NetworkInterface| {
iface.name == interface_name
}; // Find the network interface with the provided name let interfaces: Vec<NetworkInterface> = datalink::interfaces(); let interface = interfaces .into_iter() .filter(interface_names_match) .next() .unwrap();

The user-supplied interface name is checked against the interfaces pnet is able to find, via its datalink::interfaces() function. We haven't done much yet in this book with iterators, though they've been omnipresent and assumed knowledge, at least minimally. We'll discuss iteration in Rust in detail after this. Note here, though, that filter(interface_names_match) is applying a boolean function, interface_names_match(&NetworkInterface) -> bool, to each member of the determined interfaces.

If the function returns true, the member passes through the filter, otherwise it doesn't. The call to next().unwrap() cranks the filtered iterator forward one item, crashing if there are no items in the filtered iterator. This is a somewhat user-hostile way to determine whether the passed interface is, in fact, an interface that pnet could discover. That's alright here in our demonstration program.

Next, we establish a communication channel for the concurrent actors of this program:

    let (snd, rcv) = mpsc::sync_channel(10);

    let _ = thread::spawn(|| gather(rcv));
    let timer_snd = snd.clone();
    let _ = thread::spawn(move || timer(timer_snd));

The timer function pushes a time pulse through the channel we just established at regular intervals, as similarly is done in cernan, discussed previously in this book. The function is small:

fn timer(snd: mpsc::SyncSender<Payload>) -> () {
    use std::{thread, time};
    let one_second = time::Duration::from_millis(1000);

    let mut pulses = 0;
    loop {
        thread::sleep(one_second);
        snd.send(Payload::Pulse(pulses)).unwrap();
        pulses += 1;
    }
}

The Payload type is an enum with only two variants:

enum Payload {
    Packet {
        source: MacAddr,
        destination: MacAddr,
        kind: EtherType,
    },
    Pulse(u64),
}

The Pulse(u64) variant is the timer pulse, sent periodically by the timer thread. It's very useful to divorce time in a concurrent system from the actual wall-clock, especially with regard to testing individual components of the system. It's also helpful for the program structure to unify different message variants in a union. At the time of writing, Rust's MPSC implementation does not have a stable select capability, and so you'll have to manually implement that with mpsc::Receiver::recv_timeout and careful multiplexing. It's much better to unify it into one union type. Additionally, this gets the type system on your side, confirming that all incoming variants are handled in a single match, which likely optimizes better, too. This last benefit should be measured.

Let's look at gather now:

fn gather(rcv: mpsc::Receiver<Payload>) -> () {
    let mut sources: HashMap<MacAddr, u64> = HashMap::new();
    let mut destinations: HashMap<MacAddr, u64> = HashMap::new();
    let mut ethertypes: HashMap<EtherType, u64> = HashMap::new();

    while let Ok(payload) = rcv.recv() {
        match payload {
            Payload::Pulse(id) => {
                println!("REPORT {}", id);
                println!("    SOURCES:");
                for (k, v) in sources.iter() {
                    println!("        {}: {}", k, v);
                }
                println!("    DESTINATIONS:");
                for (k, v) in destinations.iter() {
                    println!("        {}: {}", k, v);
                }
                println!("    ETHERTYPES:");
                for (k, v) in ethertypes.iter() {
                    println!("        {}: {}", k, v);
                }
            }
            Payload::Packet {
                source: src,
                destination: dst,
                kind: etype,
            } => {
                let mut destination = destinations.entry(dst).or_insert(0);
                *destination += 1;

                let mut source = sources.entry(src).or_insert(0);
                *source += 1;

                let mut ethertype = ethertypes.entry(etype).or_insert(0);
                *ethertype += 1;
            }
        }
    }
}

Straightforward receiver loop, pulling off Payload enums. The Payload::Packet variant is deconstructed and its contents stored into three HashMaps, mapping MAC addresses to a counter or an EtherType to a counter. MAC addresses are the unique identifiers of network interfaces—or, ideally unique, as there have been goofs—and get used at the data-link layer of the OSI model. (This is not a book about networking but it is, I promise, a really fun domain.) EtherType maps to the two octet fields at the start of every Ethernet packet, defining the packet's, well, type. There's a standard list of types, which pnet helpfully encodes for us. When gather prints the EtherType, there's no special work needed to get human-readable output.

Now that we know how gather and timer work, we can wrap up the main function:

    let iface_handler = match datalink::channel(&interface, 
Default::default()) { Ok(Ethernet(tx, rx)) => { let snd = snd.clone(); thread::spawn(|| watch_interface(tx, rx, snd)) } Ok(_) => panic!("Unhandled channel type"), Err(e) => panic!( "An error occurred when creating the datalink channel: {}", e ), }; iface_handler.join().unwrap(); }

The datalink::channel function establishes an MPSC-like channel for bi-directional packet reading and writing on the given interface. We only care about Ethernet packets here and match only on that variant. We spawn a new thread for watch_interface, which receives both read/write sides of the channel and the MPSC we made for Payload. In this way, we have one thread reading Ethernet packets from the network, stripping them into a normalized Payload::Packet and pushing them to the gather thread. Meanwhile, we have another timer thread pushing Payload::Pulse at regular intervals to the gather thread to force user reporting.

The sharp-eyed reader will have noticed that our Payload channel is actually sync_channel(10), meaning that this program is not meant to store much transient information in memory. On a busy Ethernet network, this will mean it's entirely possible that watch_interface will be unable to push a normalized Ethernet packet into the channel. Something will have to be done and it all depends on how willing we are to lose information.

Let's look and see how this implementation goes about addressing this problem:

fn watch_interface(
    mut tx: Box<DataLinkSender>,
    mut rx: Box<DataLinkReceiver>,
    snd: mpsc::SyncSender<Payload>,
) {
    loop {
        match rx.next() {
            Ok(packet) => {
                let packet = EthernetPacket::new(packet).unwrap();

So far, so good. We see the sides of datalink::Channel that we discussed earlier, plus our internal MPSC. The infinite loop reads &[u8] off the receive side of the channel and our implementation has to convert this into a proper EthernetPacket. If we were being very thorough, we'd guard against malformed Ethernet packets but, since we aren't, the creation is unwrapped. Now, how about that normalization into Payload::Packet and transmission to gather:

                {
                    let payload: Payload = Payload::Packet {
                        source: packet.get_source(),
                        destination: packet.get_destination(),
                        kind: packet.get_ethertype(),
                    };
                    let thr_snd = snd.clone();
                    thread::spawn(move || {
                        thr_snd.send(payload).unwrap();
                    });
                }

Uh oh. The EthernetPacket has normalized into Payload::Packet just fine, but when we send the packet down the channel to gather we do so by spawning a thread. The decision being made here is that no incoming packet should be lost—implying we have to read them off the network interface as quickly as possible—and if the channel blocks, we'll need to store that packet in some pool, somewhere.

The pool is, in fact, a thread stack. Or, a bunch of them. Even if we were to drop the thread's stack size to a sensible low size on a saturated network, we're still going to overwhelm the operating system at some point. If we absolutely could not stand to lose packets, we could use something such as hopper (https://github.com/postmates/hopper), discussed in detail in Chapter 5, Locks  Mutex, Condvar, Barriers and RWLock, which was designed for just this use case. Or, we could defer these sends to threadpool, allow it to queue the jobs, and run them on a finite number of threads. Anyway, let's set this aside for just a second and wrap up watch_interface:

                tx.build_and_send(1, packet.packet().len(), 
&mut |new_packet|
{ let mut new_packet =
MutableEthernetPacket::new(new_packet).unwrap(); // Create a clone of the original packet new_packet.clone_from(&packet); // Switch the source and destination new_packet.set_source(packet.get_destination()); new_packet.set_destination(packet.get_source()); }); } Err(e) => { panic!("An error occurred while reading: {}", e); } } } }

The implementation takes the newly created EthernetPacket, swaps the source and destination of the original in a new EthernetPacket, and sends it back across the network at the original source. Now, let's ask ourselves, is it really important that we tally every Ethernet packet we can pull off the network? We could speed up the HashMaps in the gather thread in the fashion described in Chapter 02, Sequential Rust Performance and Testing, by inserting a faster hasher. Or, we could buffer in the watch_interface thread when the synchronous channel is full, in addition to using threadpool or hopper. Only speeding up gather and using hopper don't potentially incur unbounded storage requirements. But hopper will require a filesystem and may still not be able to accept all incoming packets, only making it less likely that there won't be sufficient storage.

It's a tough problem. Given the nature of the network at layer 2, you might as well just shed the packets. The network card itself is going to be shedding packets. With that in mind, the implementation of watch_interface in the other binary in this project—sniffer—differs only marginally:

fn watch_interface(
    mut tx: Box<DataLinkSender>,
    mut rx: Box<DataLinkReceiver>,
    snd: mpsc::SyncSender<Payload>,
) {
    loop {
        match rx.next() {
            Ok(packet) => {
                let packet = EthernetPacket::new(packet).unwrap();

                let payload: Payload = Payload::Packet {
                    source: packet.get_source(),
                    destination: packet.get_destination(),
                    kind: packet.get_ethertype(),
                };
                if snd.try_send(payload).is_err() {
                    SKIPPED_PACKETS.fetch_add(1, Ordering::Relaxed);
                }

Payload::Packet is created and rather than calling snd.send, this implementation calls snd.try_send, ticking up a SKIPPED_PACKETS static AtomicUsize in the event a packet has to be shed. The gather implementation is likewise only slightly adjusted to report on this new SKIPPED_PACKETS:

fn gather(rcv: mpsc::Receiver<Payload>) -> () {
    let mut sources: HashMap<MacAddr, u64> = HashMap::new();
    let mut destinations: HashMap<MacAddr, u64> = HashMap::new();
    let mut ethertypes: HashMap<EtherType, u64> = HashMap::new();

    while let Ok(payload) = rcv.recv() {
        match payload {
            Payload::Pulse(id) => {
                println!("REPORT {}", id);
                println!(
                    "    SKIPPED PACKETS: {}",
                    SKIPPED_PACKETS.swap(0, Ordering::Relaxed)
                );

This program will use a moderate amount of storage for the Payload channel, no matter how busy the network.

In high-traffic domains or long-lived deployments, the HashMaps in the gather thread are going to be a concern, but this is a detail the intrepid reader is invited to address.

When you run sniffer, you should be rewarded with output very much like this:

REPORT 6
    SKIPPED PACKETS: 75
    SOURCES:
        ff:ff:ff:ff:ff:ff: 1453
        00:23:6a:00:51:6e: 2422
        5c:f9:38:8b:4a:b6: 1034
    DESTINATIONS:
        33:33:ff:0b:62:e8: 1
        33:33:ff:a1:90:82: 1
        33:33:00:00:00:01: 1
        00:23:6a:00:51:6e: 2414
        5c:f9:38:8b:4a:b6: 1032
        ff:ff:ff:ff:ff:ff: 1460
    ETHERTYPES:
        Ipv6: 4
        Ipv4: 1999
        Arp: 2906

This report came from the sixth second of my sniffer run and 75 packets were dropped for lack of storage. That's better than 75 threads.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.0.145