A hazard-pointer Treiber stack

The introduction of the Treiber stack in the previous section on reference counting was not an idle introduction. We'll examine hazard pointers and epoch-based reclamation here, through the lens of an effectively reclaimed Treiber stack. It so happens that conc ships with a Treiber stack, in tfs/conc/src/sync/treiber.rs. The preamble is mostly familiar:

use std::sync::atomic::{self, AtomicPtr};
use std::marker::PhantomData;
use std::ptr;
use {Guard, add_garbage_box};

Both Guard and add_garbage_box are new, but we'll get to them directly. The Treiber struct is as you might have imaged it:

pub struct Treiber<T> {
    /// The head node.
    head: AtomicPtr<Node<T>>,
    /// Make the `Sync` and `Send` (and other OIBITs) transitive.
    _marker: PhantomData<T>,
}

As is the node:

struct Node<T> {
    /// The data this node holds.
    item: T,
    /// The next node.
    next: *mut Node<T>,
}

This is where we need to understand conc::Guard. Much like MutexGuard in the standard library, Guard exists here to protect the contained data from being multiply mutated. Guard is the hazard pointer interface for conc. Let's examine Guard in detail and get to the hazard pointer algorithm. Guard is defined in tfs/conc/src/guard.rs:

pub struct Guard<T: 'static + ?Sized> {
    /// The inner hazard.
    hazard: hazard::Writer,
    /// The pointer to the protected object.
    pointer: &'static T,
}

As of writing this book, all T protected by Guard have to be static, but we'll ignore that as the codebase has references to a desire to relax that restriction. Just be aware of it should you wish to make immediate use of conc in your project. Like MutexGuard, Guard does not own the underlying data but merely protects a reference to it, and we can see that our Guard is the writer-end of the hazard. What is hazard::Writer? It's defined in tfs/conc/src/hazard.rs:

pub struct Writer {
    /// The pointer to the heap-allocated hazard.
    ptr: &'static AtomicPtr<u8>,
}

Pointer to the heap-allocated hazard? Okay, let's back out a bit. We know from the algorithm description that there has to be thread-local storage happening in coordination with, apparently, heap storage. We also know that Guard is our primary interface to the hazard pointers. There are three functions to create new instances of Guard:

  • Guard<T>::new<F: FnOnce() -> &'static T>(ptr: F) -> Guard<T>
  • Guard<T>::maybe_new<F: FnOnce() -> Option<&'static T>>(ptr: F) -> Option<Guard<T>>
  • Guard<T>::try_new<F: FnOnce() -> Result<&'static T, E>>(ptr: F) -> Result<Guard<T>, E>

The first two are defined in terms of try_new. Let's dissect try_new:

    pub fn try_new<F, E>(ptr: F) -> Result<Guard<T>, E>
    where F: FnOnce() -> Result<&'static T, E> {
        // Increment the number of guards currently being created.
        #[cfg(debug_assertions)]
        CURRENT_CREATING.with(|x| x.set(x.get() + 1));

The function takes FnOnce whose responsibility is to kick out a reference to T or fail. The first call in try_new is an increment of CURRENT_CREATING, which is thread-local Cell<usize>:

thread_local! {
    /// Number of guards the current thread is creating.
    static CURRENT_CREATING: Cell<usize> = Cell::new(0);
}

We've seen Cell in Chapter 3 The Rust Memory Model – Ownership, References and Manipulation, but thread_local! is new. This macro wraps one or more static values into std::thread::LocalKey, Rust's take on thread-local stores. The exact implementation varies from platform to platform but the basic idea holds: the value will act as a static global but will be confined to a single thread. In this way, we can program as if the value were global but without having to manage coordination between threads. Once CURRENT_CREATING is incremented:

        // Get a hazard in blocked state.
        let hazard = local::get_hazard();

The local::get_hazard function is defined in tfs/conc/src/local.rs:

pub fn get_hazard() -> hazard::Writer {
    if STATE.state() == thread::LocalKeyState::Destroyed {
        // The state was deinitialized, so we must rely on the
// global state for creating new hazards. global::create_hazard() } else { STATE.with(|s| s.borrow_mut().get_hazard()) } }

The STATE referenced in this function is:

thread_local! {
    /// The state of this thread.
    static STATE: RefCell<State> = RefCell::new(State::default());
}

The State is type defined like so:

#[derive(Default)]
struct State {
    garbage: Vec<Garbage>,
    available_hazards: Vec<hazard::Writer>,
    available_hazards_free_before: usize,
}

The field garbage maintains a list of Garbage that has yet to be moved to the global state—more on that in a minute—for reclamation. Garbage is a pointer to bytes and a function pointer to bytes called dtor, for destructor. Memory reclamation schemes must be able to deallocate, regardless of the underlying type. The common approach, and the approach taken by Garbage, is to build a monomorphized destructor function when the type information is available, but otherwise work on byte buffers. You are encouraged to thumb through the implementation of Garbage yourself, but the primary trick is Box::from_raw(ptr as *mut u8 as *mut T), which we've seen repeatedly throughout this book.

The available_hazards field stores the previously allocated hazard writers that aren't currently being used. The implementation keeps this as a cache to avoid allocator thrash. We can see this in action in local::State::get_hazard:

    fn get_hazard(&mut self) -> hazard::Writer {
        // Check if there is hazards in the cache.
        if let Some(hazard) = self.available_hazards.pop() {
            // There is; we don't need to create a new hazard.
            //
            // Since the hazard popped from the cache is not 
// blocked, we must block the hazard to satisfy
// the requirements of this function. hazard.block(); hazard } else { // There is not; we must create a new hazard. global::create_hazard() } }

The final field, available_hazards_free_before, stores hazards in the freed state, prior to the actual deallocation of the underlying type. We'll discuss this more later. Hazards are in one of four states: free, dead, blocked, or protecting. A dead hazard can be deallocated safely, along with the protected memory. A dead hazard should not be read. A free hazard is protecting nothing and may be reused. A blocked hazard is in use by some other thread can and will cause reads of the hazard to stall. A protecting hazard is, well, protecting some bit of memory. Now, jump back to this branch in local::get_hazard:

    if STATE.state() == thread::LocalKeyState::Destroyed {
        // The state was deinitialized, so we must rely on the 
// global state for creating new hazards. global::create_hazard() } else {

What is global::create_hazard? This module is tfs/conc/src/global.rs and the function is:

pub fn create_hazard() -> hazard::Writer {
    STATE.create_hazard()
}

The variable names are confusing. This STATE is not the thread-local STATE but a globally scoped STATE:

lazy_static! {
    /// The global state.
    ///
    /// This state is shared between all the threads.
    static ref STATE: State = State::new();
}

Let's dig in there:

struct State {
    /// The message-passing channel.
    chan: mpsc::Sender<Message>,
    /// The garbo part of the state.
    garbo: Mutex<Garbo>,
}

The global STATE is a mpsc Sender of Message and a mutex-guarded GarboMessage is a simple enumeration:

enum Message {
    /// Add new garbage.
    Garbage(Vec<Garbage>),
    /// Add a new hazard.
    NewHazard(hazard::Reader),
}

Garbo is something we'll get into directly. Suffice it to say for now that Garbo acts as the global garbage collector for this implementation. The global state sets up a channel, maintaining the sender side in the global state and feeding the receiver into Garbo:

impl State {
    /// Initialize a new state.
    fn new() -> State {
        // Create the message-passing channel.
        let (send, recv) = mpsc::channel();

        // Construct the state from the two halfs of the channel.
        State {
            chan: send,
            garbo: Mutex::new(Garbo {
                chan: recv,
                garbage: Vec::new(),
                hazards: Vec::new(),
            })
        }
    }

The creation of a new global hazard doesn't take much:

    fn create_hazard(&self) -> hazard::Writer {
        // Create the hazard.
        let (writer, reader) = hazard::create();
        // Communicate the new hazard to the global state 
// through the channel. self.chan.send(Message::NewHazard(reader)); // Return the other half of the hazard. writer }

This establishes a new hazard via hazard::create() and feeds the reader side down through to Garbo, returning the writer side back out to local::get_hazard(). While the names writer and reader suggest that hazard is itself an MPSC, this is not true. The hazard module is tfs/conc/src/hazard.rs and creation is:

pub fn create() -> (Writer, Reader) {
    // Allocate the hazard on the heap.
    let ptr = unsafe {
        &*Box::into_raw(Box::new(AtomicPtr::new(
&BLOCKED as *const u8 as *mut u8))) }; // Construct the values. (Writer {
ptr: ptr,
}, Reader {
ptr: ptr,
})
}

Well, look at that. What we have here are two structs, Writer and Reader, which each store the same raw pointer to a heap-allocated atomic pointer to a mutable byte pointer. Phew! We've seen this trick previously but what's special here is the leverage of the type system to provide for different reading and writing interfaces over the same bit of raw memory.

What about Garbo? It's defined in the global module and is defined as:

struct Garbo {
    /// The channel of messages.
    chan: mpsc::Receiver<Message>,
    /// The to-be-destroyed garbage.
    garbage: Vec<Garbage>,
    /// The current hazards.
    hazards: Vec<hazard::Reader>,
}

Garbo defines a gc function that reads all Messages from its channel, storing the garbage into the garbage field and free hazards into hazards. Dead hazards are destroyed, freeing its storage as the other holder is guaranteed to have hung up already. Protected hazards also make their way into hazards, which are to be scanned during the next call of gc. Garbage collections are sometimes performed when a thread calls global::tick() or when global::try_gc() is called. A tick is performed whenever local::add_garbage is called, which is what whatconc::add_garbage_box calls.

We first encountered add_barbage_box at the start of this section. Every time a thread signals a node as garbage, it rolls the dice and potentially becomes responsible for performing a global garbage collection over all of the threads' hazard-pointed memory.

Now that we understand how memory reclamation works, all that remains is to understand how hazard pointers protect memory from reads and writes. Let's finish guard::try_new in one large jump:

        // This fence is necessary for ensuring that `hazard` does not
// get reordered to after `ptr` has run. // TODO: Is this fence even necessary? atomic::fence(atomic::Ordering::SeqCst); // Right here, any garbage collection is blocked, due to the
// hazard above. This ensures that between the potential
// read in `ptr` and it being protected by the hazard, there // will be no premature free. // Evaluate the pointer through the closure. let res = ptr(); // Decrement the number of guards currently being created. #[cfg(debug_assertions)] CURRENT_CREATING.with(|x| x.set(x.get() - 1)); match res { Ok(ptr) => { // Now that we have the pointer, we can protect it by
// the hazard, unblocking a pending garbage collection
// if it exists. hazard.protect(ptr as *const T as *const u8); Ok(Guard { hazard: hazard, pointer: ptr, }) }, Err(err) => { // Set the hazard to free to ensure that the hazard
// doesn't remain blocking. hazard.free(); Err(err) } } }

We can see that the conc authors have inserted a sequentially consistent fence that they question. The model laid out by Michael does not require sequential consistency and I believe that this fence is not needed, being a significant drag on performance. The key things here to note are the call to hazard::protect and 'hazard::free. Both calls are part of hazard::Writer, the former setting the internal pointer to the byte pointer fed to it, the latter marking the hazard as free. Both states interact with the garbage collector, as we've seen. The remaining bit has to do with hardard::Reader::get, the function used to retrieve the state of the hazard. Here it is:

impl Reader {
    /// Get the state of the hazard.
    ///
    /// It will spin until the hazard is no longer in a blocked state, 
/// unless it is in debug mode, where it will panic given enough
/// spins. pub fn get(&self) -> State { // In debug mode, we count the number of spins. In release
// mode, this should be trivially optimized out. let mut spins = 0; // Spin until not blocked. loop { let ptr = self.ptr.load(atomic::Ordering::Acquire)
as *const u8; // Blocked means that the hazard is blocked by another
// thread, and we must loop until it assumes another
// state. if ptr == &BLOCKED { // Increment the number of spins. spins += 1; debug_assert!(spins < 100_000_000, " Hazard blocked for 100 millions rounds. Panicking
as chances are that it will never get unblocked. "); continue; } else if ptr == &FREE { return State::Free; } else if ptr == &DEAD { return State::Dead; } else { return State::Protect(ptr); }

Only if the hazard is blocked does the get of the state spin until it's dead, free, or merely protected. What blocks the hazard? Recall that they're created blocked. By creating the hazards in a blocked state, it is not possible to perform garbage collection over a pointer that has not been fully initialized—a problem we saw with the reference-counting implementation—nor is it possible to read from a partially-initialized hazarded pointer. Only once the pointer is moved into the protected state can reads move forward.

And there you go—garbage collection and atomic isolation.

Let's go all the way back up to the stack and look at its push implementation:

    pub fn push(&self, item: T)
    where T: 'static {
        let mut snapshot = Guard::maybe_new(|| unsafe {
            self.head.load(atomic::Ordering::Relaxed).as_ref()
        });

        let mut node = Box::into_raw(Box::new(Node {
            item: item,
            next: ptr::null_mut(),
        }));

        loop {
            let next = snapshot.map_or(ptr::null_mut(), 
|x| x.as_ptr() as *mut _); unsafe { (*node).next = next; } match Guard::maybe_new(|| unsafe { self.head.compare_and_swap(next, node,
atomic::Ordering::Release).as_ref() }) { Some(ref new) if new.as_ptr() == next => break, None if next.is_null() => break, // If it fails, we will retry the CAS with updated
// values. new => snapshot = new, } } }

At the top of the function, the implementation loads the hazard pointer for the head node into the snapshot. Guard::as_ptr(&self) -> *const T retrieves the current pointer for the hazard on each invocation, adapting as the underlying data shifts forward. The node is the allocated and raw-pointered Node containing item: T. The remainder of the loop is the same compare-and-swap we've seen for other data structures of this kind, merely in terms of a hazard Guard instead of raw AtomicPtrs or the like. The programming model is very direct, as it is for pop as well:

    pub fn pop(&self) -> Option<Guard<T>> {
        let mut snapshot = Guard::maybe_new(|| unsafe {
            self.head.load(atomic::Ordering::Acquire).as_ref()
        });

        while let Some(old) = snapshot {
            snapshot = Guard::maybe_new(|| unsafe {
                self.head.compare_and_swap(
                    old.as_ptr() as *mut _,
                    old.next as *mut Node<T>,
                    atomic::Ordering::Release,
                ).as_ref()
            });

            if let Some(ref new) = snapshot {
                if new.as_ptr() == old.as_ptr() {
                    unsafe { add_garbage_box(old.as_ptr()); }
                    return Some(old.map(|x| &x.item));
                }
            } else {
                break;
            }
        }

        None
    }

Note that when the old node is removed from the stack, add_garbage_box is called on it, adding the node to be garbage-collected at a later date. We know, further, from inspection, that this later date might well be exactly the moment of invocation, depending on luck.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.152.58