The deque

Roughly speaking, hopper is a concurrent deque with two cooperating finite state machines layered on top. We'll start in with the deque, defined in src/deque.rs.

The discussion of hopper that follows lists almost all of its source code. We'll call out the few instances where the reader will need to refer to the listing in the book's source repository. 

To be totally clear, a deque is a data structure that allows for queuing and dequeuing at either end of the queue. Rust's stdlib has VecDeque<T>, which is very useful. Hopper is unable to use it, however, as one of its design goals is to allow for parallel sending and receiving against the hopper queue and VecDeque is not thread-safe. Also, while there are concurrent deque implementations in the crate ecosystem, the hopper deque is closely tied to the finite state machines it supports and to hopper's internal ownership model. That is, you probably can't use hopper's deque in your own project without some changes. Anyhow, here's the preamble:

use std::sync::{Condvar, Mutex, MutexGuard};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{mem, sync};

unsafe impl<T, S> Send for Queue<T, S> {}
unsafe impl<T, S> Sync for Queue<T, S> {}

The only unfamiliar piece here are the things imported from std::sync::atomic. We'll be covering atomics in more detail in the next chapter, but we're going to breeze over them at a high-level as needed in the explanation to follow. Note as well the unsafe declarations of send and sync for some as yet unknown type Queue<T, S>. We're about to go off-road:

pub struct Queue<T, S> {
    inner: sync::Arc<InnerQueue<T, S>>,
}

The Queue<T, S> definition is similar to what we saw in previous chapters: a simple structure wrapping an inner structure, here called InnerQueue<T, S>. The InnerQueue is wrapped in an Arc, meaning there's only one allocated InnerQueue on the heap. As you might expect, the clone of Queue is a copy of the Arc into a new struct:

impl<T, S> Clone for Queue<T, S> {
    fn clone(&self) -> Queue<T, S> {
        Queue {
            inner: sync::Arc::clone(&self.inner),
        }
    }
}

It's important that every thread that interacts with Queue sees the same InnerQueue. Otherwise, the threads are dealing with distinct areas of memory and have no relationship with one another. Let's look at InnerQueue:

struct InnerQueue<T, S> {
    capacity: usize,
    data: *mut Option<T>,
    size: AtomicUsize,
    back_lock: Mutex<BackGuardInner<S>>,
    front_lock: Mutex<FrontGuardInner>,
    not_empty: Condvar,
}

Okay, this is much more akin to the internals we saw in Rust itself, and there's a lot going on. The field capacity is the maximum number of elements that the InnerQueue will hold in data. Like the first Ring in the previous chapter, InnerQueue uses a contiguous block of memory to store its T elements, exploding a Vec to get that contiguous block. Also, like the first Ring, we store Option<T> elements in the contiguous block of memory. Technically, we could deal with a contiguous block of raw pointers or copy memory blocks in and out. But the use of Option<T> simplifies the code, both for insertion and removal, at the cost of a single byte of memory per element. The added complication just isn't worth it for the performance goals hopper is trying to hit.

The size field is an atomic usize. Atomics will be covered in more detail in the next chapter, but the behavior of size is going to be important. For now, think of it as a very small piece of synchronization between threads; a little hook that will allow us to order memory accesses that happens also to act like a usize. The condvar not_empty is used to signal to any potential readers waiting for new elements to pop that there are, in fact, elements to pop. The use of condvar greatly reduces the CPU load of hopper without sacrificing latency to busy loops with sleeps. Now, back_lock and front_lock. What's going on here? Either side of the deque is protected by a mutex, meaning there can be only one enqueuer and one dequeuer at a time, but these can be running in parallel to each other. Here are the definitions of the two inner values of the mutexes:

#[derive(Debug, Clone, Copy)]
pub struct FrontGuardInner {
    offset: isize,
}

#[derive(Debug)]
pub struct BackGuardInner<S> {
    offset: isize,
    pub inner: S,
}

 FrontGuardInner   is the easier to explain of the two. The only field is offset, which defines the offset from the first pointer of InnerGuard's data of the thread manipulating the front of the queue. This contiguous store is also used in a ring buffer fashion. In BackGuardInner, we see the same offset, but an additional inner, S. What is this? As we'll see, the threads manipulating the back of the buffer need extra coordination between them. Exactly what that is, the queue does not care. Therefore, we make it a type parameter and allow the caller to sort everything out, being careful to pass the data around as needed. In this fashion, the queue smuggles state through itself but does not have to inspect it.

Let's start on the implementation of InnerQueue:

impl<T, S> InnerQueue<T, S>
where
    S: ::std::default::Default,
{
    pub fn with_capacity(capacity: usize) -> InnerQueue<T, S> {
        assert!(capacity > 0);
        let mut data: Vec<Option<T>> = Vec::with_capacity(capacity);
        for _ in 0..capacity {
            data.push(None);
        }
        let raw_data = (&mut data).as_mut_ptr();
        mem::forget(data);
        InnerQueue {
            capacity: capacity,
            data: raw_data,
            size: AtomicUsize::new(0),
            back_lock: Mutex::new(BackGuardInner {
                offset: 0,
                inner: S::default(),
            }),
            front_lock: Mutex::new(FrontGuardInner { offset: 0 }),
            not_empty: Condvar::new(),
        }
    }

The type lacks a new() -> InnerQueue, as there was no call for it to be made. Instead, there's only with_capacity and that's quite similar to what we saw of Ring's with_capacity—a vector is allocated, and exploded into a raw pointer, and the original reference is forgotten before the pointer is loaded into a newly minted struct.

The type S has to implement a default for initialization that is sufficient, as the caller's smuggled state will always be the same value, which is more than adequately definable as a default. If this deque were intended for general use, we'd probably need to offer a with_capacity that also took an S directly. Now, a few further functions in the implementation that we'll just breeze right past:

    pub fn capacity(&self) -> usize {
        self.capacity
    }

    pub fn size(&self) -> usize {
        self.size.load(Ordering::Relaxed)
    }

    pub fn lock_back(&self) -> MutexGuard<BackGuardInner<S>> {
        self.back_lock.lock().expect("back lock poisoned")
    }

    pub fn lock_front(&self) -> MutexGuard<FrontGuardInner> {
        self.front_lock.lock().expect("front lock poisoned")
    }

The next function, push_back, is very important and subtle:

    pub unsafe fn push_back(
        &self,
        elem: T,
        guard: &mut MutexGuard<BackGuardInner<S>>,
    ) -> Result<bool, Error<T>> {
        let mut must_wake_dequeuers = false;
        if self.size.load(Ordering::Acquire) == self.capacity {
            return Err(Error::Full(elem));
        } else {
            assert!((*self.data.offset((*guard).offset)).is_none());
            *self.data.offset((*guard).offset) = Some(elem);
            (*guard).offset += 1;
            (*guard).offset %= self.capacity as isize;
            if self.size.fetch_add(1, Ordering::Release) == 0 {
                must_wake_dequeuers = true;
            }
        }
        Ok(must_wake_dequeuers)
    }

InnerQueue::push_back is responsible for placing a T onto the current back of the ring buffer, or failing capacity to signal that the buffer is full. When we discussed Ring, we noted that the size == capacity check was a race. Not so in InnerQueue, thanks to the atomic nature of the size. self.size.load(Ordering::Acquire) performs a memory load of the self.size but does so with certainty that it's the only thread with self.size as a manipulable value. Subsequent memory operations in the thread will be ordered after Acquire, at least until a store of Ordering::Release happens. A store of that nature does happen just a handful of lines down—self.size.fetch_add(1, Ordering::Release). Between these two points, we see the element T loaded into the buffer—with a prior check to ensure that we're not stomping a Some value—and a wrapping bump of the BackGuardInner's offset. Just like in the last chapter. Where this implementation differs is the return of Ok(must_wake_dequeuers). Because the inner S is being guarded, it's not possible for the queue to know if there will be any further work that needs to be done before the mutex can be given up. As a result, the queue cannot itself signal that there's a value to read, even though it's already been written to memory by the time the function returns. The caller has to run the notification. That's a sharp edge. If the caller forgets to notify a thread blocked on the condvar, the blocked thread will stay that way forever.

The InnerQueue::push_front is a little simpler and not a radical departure from push_back:

    pub unsafe fn pop_front(&self) -> T {
        let mut guard = self.front_lock.lock()
.expect("front lock poisoned"); while self.size.load(Ordering::Acquire) == 0 { guard = self.not_empty .wait(guard) .expect("oops could not wait pop_front"); } let elem: Option<T> = mem::replace(&mut
*self.data.offset((*guard).offset), None); assert!(elem.is_some()); *self.data.offset((*guard).offset) = None; (*guard).offset += 1; (*guard).offset %= self.capacity as isize; self.size.fetch_sub(1, Ordering::Release); elem.unwrap() } }

The thread popping front, because it does not have to coordinate, is able to take the front lock itself, as there's no state that needs to be threaded through. After receiving the lock, the thread then enters a condition check loop to guard against spurious wake-ups on not_empty, replacing the item at offset with None when the thread is able to wake up. The usual offset maintenance occurs.

The implementation of Queue<T, S> is pretty minimal in comparison to the inner structure. Here's push_back:

pub fn push_back(
    &self,
    elem: T,
    mut guard: &mut MutexGuard<BackGuardInner<S>>,
) -> Result<bool, Error<T>> {
    unsafe { (*self.inner).push_back(elem, &mut guard) }
}

The only function that's substantially new to Queue is notify_not_empty(&self, _guard: &MutexGuard<FrontGuardInner>) -> (). The caller is responsible for calling this whenever push_back signals that the dequeuer must be notified and, while the guard is not used, one rough edge of the library is smoothed down by requiring that it be passed in, proving that it's held.

That's the deque at the core of hopper. This structure was very difficult to get right. Any slight re-ordering of the load and store on the atomic size with respect to other memory operations will introduce bugs, such as parallel access to the same memory of a region without coordination. These bugs are very subtle and don't manifest immediately. Liberal use of helgrind plus quickcheck testing across x86 and ARM processors—more on that later—will help drive up confidence in the implementation. Test runs of hours were not uncommon to find bugs that were not deterministic but could be reasoned about, given enough time and repeat examples. Building concurrent data structures out of very primitive pieces is hard.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.104.153