The Receiver

Now that we've covered the deque, let's jump on to the Receiver, defined in src/receiver.rs. As mentioned previously, the receiver is responsible for either pulling a value out of memory or from disk in the style of an iterator. The Receiver declares the usual machinery for transforming itself into an iterator, and we won't cover that in this book, mostly just because it's a touch on the tedious side. That said, there are two important functions to cover in the Receiver, and neither of them are in the public interface. The first is Receiver::next_value, called by the iterator version of the receiver. This function is defined as follows:

fn next_value(&mut self) -> Option<T> {
    loop {
        if self.disk_writes_to_read == 0 {
            match self.mem_buffer.pop_front() {
                private::Placement::Memory(ev) => {
                    return Some(ev);
                }
                private::Placement::Disk(sz) => {
                    self.disk_writes_to_read = sz;
                    continue;
                }
            }
        } else {
            match self.read_disk_value() {
                Ok(ev) => return Some(ev),
                Err(_) => return None,
            }
        }
    }
}

A hopper defined to move T values does not define a deque—as discussed in the previous section—over Ts. Instead, the deque actually holds Placement<T>. Placement, defined in src/private.rs, is a small enumeration:

pub enum Placement<T> {
    Memory(T),
    Disk(usize),
}

This is the trick that makes hopper work. The primary challenge with a concurrent data structure is providing sufficient synchronization between threads that your results can remain coherent despite the chaotic nature of scheduling, but not require so much synchronization that you're underwater compared to a sequential solution.

That does happen.

Now, recall that maintaining order is a key design need for any channel-style queue. The Rust standard library MPSC achieves this with atomic flags, aided by the fact that, ultimately, there's only one place for inserted elements to be stored and one place for them to be removed from. Not so in hopper. But, that one-stop-shop is a very useful, low synchronization approach. That's where Placement comes in. When the Memory variant is hit, the T is present already and the receiver simply returns it. When Disk(usize) is returned, that sends a signal to the receiver to flip itself into disk mode. In disk mode, when self.disk_writes_to_read is not zero, the receiver preferentially reads a value from disk. Only when there are no more disk values to be read does the receiver attempt to read from memory again. This mode-flipping approach maintains ordering but also has the added benefit of requiring no synchronization when in disk mode, saving critical time when reading from a slow disk.

The second important function to examine is read_disk_value, referenced in next_value. It's long and mostly book-keeping, but I did want to call out the first part of that function here:

    fn read_disk_value(&mut self) -> Result<T, super::Error> {
        loop {
            match self.fp.read_u32::<BigEndian>() {
                Ok(payload_size_in_bytes) => {
                    let mut payload_buf = vec![0; payload_size_in_bytes 
as usize]; match self.fp.read_exact(&mut payload_buf[..]) { Ok(()) => { let mut dec =
DeflateDecoder::new(&payload_buf[..]); match deserialize_from(&mut dec) { Ok(event) => { self.disk_writes_to_read -= 1; return Ok(event); } Err(e) => panic!("Failed decoding.
Skipping {:?}", e), } }

This small chunk of code uses two very useful libraries. Hopper stores its disk-slop elements to disk in bincode format. Bincode (https://crates.io/crates/bincode) was invented for the servo project and is a serialization library intended for IPC - more or less the exact use hopper has for it. The advantage of bincode is that it's fast to serialize and deserialize but with the disadvantage of not being a standard and not having a guaranteed binary format from version to version. The second library to be called out is almost invisible in this example; byteorder. You can see it here: self.fp.read_u32::<BigEndian>. Byteorder extends std::io::Read to allow for the deserialization of primitive types from byte-buffers. It is possible to do this yourself by hand but it's error-prone and tedious to repeat. Use byteorder. So, what we're seeing here is hopper reading a 32-bit length big-ending length prefix from self.fp—a std::io::BufReader pointed to the current on-disk queue file—and using the said prefix to read exactly that many bytes from disk, before passing those on into the deserializer. That's it. All hopper on-disk slop elements are a 32 bit length prefix chased by that many bytes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.197.95