An incorrect atomic queue

Before we build anything else, we're going to need a key data structure—a unbounded queue. In Chapter 5, Locks – Mutex, Condvar, Barriers, and RWLock, we discussed a bounded deque protected at either end by mutexes. We're in the business, now, of building synchronization and can't make use of the mutex approach. Our ambition is going to be to produce an unbounded first-in-first-out data structure that has no locks, never leaves an enqueuer or dequeuer deadlocked, and is linearizable to a sequential queue. It turns out there's a pretty straightforward data structure that achieves this aim; the Michael and Scott Queue, introduced in their 1995 paper Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms. The reader is encouraged to breeze through that paper before continuing on with our discussion here, but it's not strictly necessary.

A word of warning. Our implementation will be wrong. The careful reader will note that we're following the paper closely. There are two, and maybe more, issues with the implementation we'll present, one major and unresolvable and the other addressable with some care. Both are fairly subtle. Let's dig in.

Queue is defined in src/queue.rs, the preamble of which is:

use std::ptr::null_mut;
use std::sync::atomic::{AtomicPtr, Ordering};

unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}

Right off you can see from null_mut that we're going to be dealing with raw pointers. The AtomicPtr is new, though we did mention it in passing earlier in the chapter. An atomic pointer adapts a raw pointer—*mut T—to be suitable for atomic operations. There's no runtime overhead associated with AtomicPtr; the Rust documentation notes that the type has the same in-memory representation as a *mut T. Modern machines expose instructions, giving the programmer the ability to fiddle with memory atomically. Capabilities vary by processor, as you'd expect. LLVM and therefore Rust expose these atomic memory fiddling capabilities through AtomicPtr, allowing the range of pointer fiddling in the sequential language but atomically. What this means, in practice, is that we can start setting up happens-before/synchronizes-with causality relationships for pointer manipulation, which is essential for building data structures.

Here's the next part:

struct Node<T> {
    value: *const T,
    next: AtomicPtr<Node<T>>,
}

impl<T> Default for Node<T> {
    fn default() -> Self {
        Node {
            value: null_mut(),
            next: AtomicPtr::default(),
        }
    }
}

impl<T> Node<T> {
    fn new(val: T) -> Self {
        Node {
            value: Box::into_raw(Box::new(val)),
            next: AtomicPtr::default(),
        }
    }
}

The interior of deque from the previous chapter was a single, contiguous block. That's not the approach we're taking here. Instead, every element inserted into the queue will get a Node and that Node will point to the next Node, which may or may not exist yet. It's a linked list. The contiguous block approach is a bit harder to pull off in an atomic context—though it is entirely possible and there are discussions in the Further reading section papers—and would come down to a linked list of contiguous blocks. It's more trouble than it's worth for our purposes here:

struct InnerQueue<T> {
    head: AtomicPtr<Node<T>>,
    tail: AtomicPtr<Node<T>>,
}

One key thing to note is that Node holds a pointer to a heap allocated T, not the T directly. In the preceding code, we have the InnerQueue<T> of Queue<T>, pulling the usual inner/outer structure detailed elsewhere in this book and in rustc. Why is it important to note that Node doesn't hold its T directly? The value inside of the head of the Queue<T> is never inspected. The head of the Queue is a sentinel. When the InnerQueue is created, we'll see the following:

impl<T> InnerQueue<T> {
    pub fn new() -> Self {
        let node = Box::into_raw(Box::new(Node::default()));
        InnerQueue {
            head: AtomicPtr::new(node),
            tail: AtomicPtr::new(node),
        }
    }

Both the head and tail of the InnerQueue point to the same nonsense-valued Node, as expected. The value at the outset is, in fact, null. Atomic data structures have issues with memory reclamation in that coordinating drops is problematic and must be done only once. It's possible to alleviate this issue somewhat by relying on Rust's type system, but it's still a non-trivial project and is an active area of research, generally. Here, we note that we're careful to hand out the ownership of the element only once. Being a raw pointer, it can be given away more than once at a time, but that path leads to double-frees. InnerQueue converts *const T into T—an unsafe operation—and just never dereferences the *const T again, allowing the caller to do the drop in its own sweet time:

    pub unsafe fn enq(&mut self, val: T) -> () {
        let node = Box::new(Node::new(val));
        let node: *mut Node<T> = Box::into_raw(node);

        loop {
            let tail: *mut Node<T> = self.tail.load(Ordering::Acquire);
            let next: *mut Node<T> = 
(*tail).next.load(Ordering::Relaxed); if tail == self.tail.load(Ordering::Relaxed) { if next.is_null() { if (*tail).next.compare_and_swap(next, node,
Ordering::Relaxed) == next { self.tail.compare_and_swap(tail, node,
Ordering::Release); return; } } } else { self.tail.compare_and_swap(tail, next,
Ordering::Release); } } }

This is the enq operation, marked unsafe because of the raw pointer manipulation going on. That's an important point to consider—AtomicPtr is necessarily going to be done with raw pointers. There's a lot going on here, so let's break it up into smaller chunks:

    pub unsafe fn enq(&mut self, val: T) -> () {
        let node = Box::new(Node::new(val));
        let node: *mut Node<T> = Box::into_raw(node);

Here, we're constructing the Node for val. Notice we're using the same boxing, the into_raw approach used so often in previous chapters. This node doesn't have a place in the queue yet and the calling thread does not hold an exclusive lock over the queue. Insertion will have to take place in the midst of other insertions:

        loop {
            let tail: *mut Node<T> = self.tail.load(Ordering::Acquire);
            let next: *mut Node<T> = 
(*tail).next.load(Ordering::Relaxed);

With that in mind, it's entirely possible that an insertion attempt can fail. The enqueing of an element in a queue takes place at the tail, the pointer to which we load up and call last. The next node after tail is called next. In a sequential queue, we'd be guaranteed that the next of tail is null, but that's not so here. Consider that between the load of the tail pointer and the load of the next pointer an enq run by another thread may have already completed.

Enqueing is, then, an operation that might take several attempts before we hit just the right conditions for it to succeed. Those conditions are last still being the tail of the structure and next being null:

            if tail == self.tail.load(Ordering::Relaxed) {
                if next.is_null() {

Note that the first load of tail is Acquire and each of the possible stores of it, in either branch, are Release. This satisfies our Acquire/Release needs, with regard to locking primitives. All other stores and loads here are conspicuously Relaxed. How can we be sure we're not accidentally stomping writes or, since this is a linked list, cutting them loose in memory? That's where the AtomicPtr comes in:

                    if (*tail).next.compare_and_swap(next, node, 
Ordering::Relaxed) == next { self.tail.compare_and_swap(tail, node,
Ordering::Release); return; }

It is entirely possible that by the time we've detected the proper conditions for enqueing another thread will have been scheduled in, have detected the proper conditions for enqueing, and have been enqueued. We attempt to slot our new node in with (*last).next.compare_and_swap(next, node, Ordering::Relaxed), that is, we compare the current next of last and if and only if that succeeds—that's the == next bit—do we attempt to set tail to the node pointer, again with a compare and swap. If both of those succeed then the new element has been fully enqueued. It's possible that the swap of tail will fail, however, in which case the linked list is correctly set up but the tail pointer is off. Both the enq and deq operations must be aware they could stumble into a situation where the tail pointer needs to be adjusted. That is in fact how the enq function finishes off:

                }
            } else {
                self.tail.compare_and_swap(tail, next, 
Ordering::Release); } } }

On an x86, all of these Relaxed operations are more strict but on ARMv8 there will be all sorts of reordering. It's very important, and difficult, to establish a causal relationship between all modifications. If, for example, we swapped the tail pointer and then the next of the tail pointer, we'd open ourselves up to breaking the linked list, or making whole isolated chains depending on the threads' view of memory. The deq operation is similar:

    pub unsafe fn deq(&mut self) -> Option<T> {
        let mut head: *mut Node<T>;
        let value: T;
        loop {
            head = self.head.load(Ordering::Acquire);
            let tail: *mut Node<T> = self.tail.load(Ordering::Relaxed);
            let next: *mut Node<T> = 
(*head).next.load(Ordering::Relaxed); if head == self.head.load(Ordering::Relaxed) { if head == tail { if next.is_null() { return None; } self.tail.compare_and_swap(tail, next,
Ordering::Relaxed); } else { let val: *mut T = (*next).value as *mut T; if self.head.compare_and_swap(head, next,
Ordering::Release) == head { value = *Box::from_raw(val); break; } } } } let head: Node<T> = *Box::from_raw(head); drop(head); Some(value) } }

The function is a loop, like enq, in which we search for the correct conditions and circumstance to dequeue an element. The first outer if clause checks that head hasn't shifted on us, while the inner first branch is to do with a queue that has no elements, where first and last are pointers to the same storage. Note here that if next is not null we try and patch up a partially completed linked list of nodes before looping back around again for another pass at dequeing.

This is because, as discussed previously, enq may not fully succeed. The second inner loop is hit when head and tail are not equal, meaning there's an element to be pulled. As the inline comment explains, we give out the ownership of the element T when the first hasn't shifted on us but are careful not to dereference the pointer until we can be sure we're the only thread that will ever manage that. We can be on account of only one thread that will ever manage to swap the particular first and next pair the calling thread currently holds.

After all of that, the actual outer Queue<T> is a touch anti-climactic:

pub struct Queue<T> {
    inner: *mut InnerQueue<T>,
}

impl<T> Clone for Queue<T> {
    fn clone(&self) -> Queue<T> {
        Queue { inner: self.inner }
    }
}

impl<T> Queue<T> {
    pub fn new() -> Self {
        Queue {
            inner: Box::into_raw(Box::new(InnerQueue::new())),
        }
    }

    pub fn enq(&self, val: T) -> () {
        unsafe { (*self.inner).enq(val) }
    }

    pub fn deq(&self) -> Option<T> {
        unsafe { (*self.inner).deq() }
    }

We've already reasoned our way through the implementation and, hopefully, you, dear reader, are convinced that the idea should function. Where the rubber meets the road is in testing:

#[cfg(test)]
mod test {
    extern crate quickcheck;

    use super::*;
    use std::collections::VecDeque;
    use std::sync::atomic::AtomicUsize;
    use std::thread;
    use std::sync::Arc;
    use self::quickcheck::{Arbitrary, Gen, QuickCheck, TestResult};

    #[derive(Clone, Debug)]
    enum Op {
        Enq(u32),
        Deq,
    }

    impl Arbitrary for Op {
        fn arbitrary<G>(g: &mut G) -> Self
        where
            G: Gen,
        {
            let i: usize = g.gen_range(0, 2);
            match i {
                0 => Op::Enq(g.gen()),
                _ => Op::Deq,
            }
        }
    }

This is the usual test preamble that we've seen elsewhere in the book. We define an Op enumeration to drive an interpreter style quickcheck test, which we call here sequential:

    #[test]
    fn sequential() {
        fn inner(ops: Vec<Op>) -> TestResult {
            let mut vd = VecDeque::new();
            let q = Queue::new();

            for op in ops {
                match op {
                    Op::Enq(v) => {
                        vd.push_back(v);
                        q.enq(v);
                    }
                    Op::Deq => {
                        assert_eq!(vd.pop_front(), q.deq());
                    }
                }
            }
            TestResult::passed()
        }
        QuickCheck::new().quickcheck(inner as fn(Vec<Op>) -> 
TestResult); }

We have a VecDeque as the model; we know it's a proper queue. Then, without dipping into any kind of real concurrency, we confirm that Queue behaves similarly to a VecDeque. At least in a sequential setting, Queue will work. Now, for a parallel test:

    fn parallel_exp(total: usize, enqs: u8, deqs: u8) -> bool {
        let q = Queue::new();
        let total_expected = total * (enqs as usize);
        let total_retrieved = Arc::new(AtomicUsize::new(0));

        let mut ejhs = Vec::new();
        for _ in 0..enqs {
            let mut q = q.clone();
            ejhs.push(
                thread::Builder::new()
                    .spawn(move || {
                        for i in 0..total {
                            q.enq(i);
                        }
                    })
                    .unwrap(),
            );
        }

        let mut djhs = Vec::new();
        for _ in 0..deqs {
            let mut q = q.clone();
            let total_retrieved = Arc::clone(&total_retrieved);
            djhs.push(
                thread::Builder::new()
                    .spawn(move || {
                        while total_retrieved.load(Ordering::Relaxed) 
!= total_expected { if q.deq().is_some() { total_retrieved.fetch_add(1,
Ordering::Relaxed); } } }) .unwrap(), ); } for jh in ejhs { jh.join().unwrap(); } for jh in djhs { jh.join().unwrap(); } assert_eq!(total_retrieved.load(Ordering::Relaxed),
total_expected); true }

We set up two groups of threads, one responsible for enqueing and the other for dequeing. The enqueuing threads push a total number of items through the Queue and the dequeuers pull until a counter—shared between each of the dequeuers—hits bingo. Finally, back in the main test thread, we confirm that the total number of retrieved items is the same as the expected number of items. It's possible that our dequeing threads will read past the end of the queue because of a race between the check on the while loop and the call of q.deq, which works in our favor because confirming the queue allows the deque of no more elements than were enqueued. That, and there are no double-free crashes when the test is run. This inner test function is used twice, once in repeated runs and then again in a quickcheck setup:

    #[test]
    fn repeated() {
        for i in 0..10_000 {
            println!("{}", i);
            parallel_exp(73, 2, 2);
        }
    }

    #[test]
    fn parallel() {
        fn inner(total: usize, enqs: u8, deqs: u8) -> TestResult {
            if enqs == 0 || deqs == 0 {
                TestResult::discard()
            } else {
                TestResult::from_bool(parallel_exp(total, enqs, deqs))
            }
        }
        QuickCheck::new().quickcheck(inner as fn(usize, u8, u8) -> 
TestResult); } }

What's wrong here? If you run the test suite, you may or may not hit one of the issues. They are fairly improbable, though we'll shortly see a way to reliably trigger the worst. The first issue our implementation runs into is the ABA problem. In a compare-and-swap operation, pointer A is to be swapped by some thread with B. Before the check can be completed in the first thread, another thread swaps A with C and then C back again to A. The first thread is then rescheduled and performs its compare-and-swap of A to B, none the wiser that A is not really the A it had at the start of the swap. This will cause chunks of the queue's linked list to point incorrectly, possibly into the memory that the queue does not rightly own. That's bad enough. What could be worse?

Let's cause a use-after-free violation with this structure. Our demonstration program is short and lives at src/bin/queue_spin.rs:

extern crate synchro;

use synchro::Queue;
use std::thread;

fn main() {
    let q = Queue::new();

    let mut jhs = Vec::new();

    for _ in 0..4 {
        let eq = q.clone();
        jhs.push(thread::spawn(move || {
            let mut i = 0;
            loop {
                eq.enq(i);
                i += 1;
                eq.deq();
            }
        }))
    }

    for jh in jhs {
        jh.join().unwrap();
    }
}

The program creates four threads, each of which enqueue and dequeue in sequence as rapidly as possible with no coordination between them. It's important to have at least two threads, else the queue is used sequentially and the issue does not exist:

> time cargo run --bin queue_spin
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/queue_spin`
Segmentation fault

real    0m0.588s
user    0m0.964s
sys     0m0.016s

Ouch. That took no time at all. Let's have a look at the program in a debugger. We'll use lldb, but if you're using gdb, the results will be the same:

> lldb-3.9 target/debug/queue_spin
(lldb) target create "target/debug/queue_spin"
Current executable set to 'target/debug/queue_spin' (x86_64).
(lldb) run
Process 12917 launched: '/home/blt/projects/us/troutwine/concurrency_in_rust/external_projects/synchro/target/debug/queue_spin' (x86_64)
Process 12917 stopped
* thread #2: tid = 12920, 0x0000555555560585 queue_spin`_$LT$synchro..queue..InnerQueue$LT$T$GT$$GT$::deq::heefaa8c9b1d410ee(self=0x00007ffff6c2a010) + 261 at queue.rs:78, name = 'queue_spin', stop reason = signal SIGSEGV: invalid address (fault address: 0x0)
    frame #0: 0x0000555555560585 queue_spin`_$LT$synchro..queue..InnerQueue$LT$T$GT$$GT$::deq::heefaa8c9b1d410ee(self=0x00007ffff6c2a010) + 261 at queue.rs:78
   75                       }
   76                       self.tail.compare_and_swap(tail, next, 
Ordering::Relaxed); 77 } else { -> 78 let val: *mut T = (*next).value as *mut T; 79 if self.head.compare_and_swap(head, next,
Ordering::Release) == head { 80 value = *Box::from_raw(val); 81 break; thread #3: tid = 12921, 0x0000555555560585 queue_spin`_$LT$synchro..queue..InnerQueue$LT$T$GT$$GT$::deq::heefaa8c9b1d410ee(self=0x00007ffff6c2a010) + 261 at queue.rs:78, name = 'queue_spin', stop reason = signal SIGSEGV: invalid address (fault address: 0x0) frame #0: 0x0000555555560585 queue_spin`_$LT$synchro..queue..InnerQueue$LT$T$GT$$GT$::deq::heefaa8c9b1d410ee(self=0x00007ffff6c2a010) + 261 at queue.rs:78 75 } 76 self.tail.compare_and_swap(tail, next,
Ordering::Relaxed); 77 } else { -> 78 let val: *mut T = (*next).value as *mut T; 79 if self.head.compare_and_swap(head, next,
Ordering::Release) == head { 80 value = *Box::from_raw(val); 81 break;

Well look at that! And, just to confirm:

(lldb) p next
(synchro::queue::Node<i32> *) $0 = 0x0000000000000000

Can we turn up another? Yes:

(lldb) Process 12893 launched: '/home/blt/projects/us/troutwine/concurrency_in_rust/external_projects/synchro/target/debug/queue_spin' (x86_64)
Process 12893 stopped
* thread #2: tid = 12896, 0x000055555555fb3e queue_spin`core::sync::atomic::atomic_load::hd37078e3d501f11f(dst=0x0000000000000008, order=Relaxed) + 78 at atomic.rs:1502, name = 'queue_spin', stop reason = signal SIGSEGV: invalid address (fault address: 0x8)
    frame #0: 0x000055555555fb3e queue_spin`core::sync::atomic::atomic_load::hd37078e3d501f11f(dst=0x0000000000000008, order=Relaxed) + 78 at atomic.rs:1502
  thread #3: tid = 12897, 0x000055555555fb3e queue_spin`core::sync::atomic::atomic_load::hd37078e3d501f11f(dst=0x0000000000000008, order=Relaxed) + 78 at atomic.rs:1502, name = 'queue_spin', stop reason = signal SIGSEGV: invalid address (fault address: 0x8)
    frame #0: 0x000055555555fb3e queue_spin`core::sync::atomic::atomic_load::hd37078e3d501f11f(dst=0x0000000000000008, order=Relaxed) + 78 at atomic.rs:1502
  thread #4: tid = 12898, 0x000055555555fb3e queue_spin`core::sync::atomic::atomic_load::hd37078e3d501f11f(dst=0x0000000000000008, order=Relaxed) + 78 at atomic.rs:1502, name = 'queue_spin', stop reason = signal SIGSEGV: invalid address (fault address: 0x8)
    frame #0: 0x000055555555fb3e queue_spin`core::sync::atomic::atomic_load::hd37078e3d501f11f(dst=0x0000000000000008, order=Relaxed) + 78 at atomic.rs:1502
  thread #5: tid = 12899, 0x000055555555fb3e queue_spin`core::sync::atomic::atomic_load::hd37078e3d501f11f(dst=0x0000000000000008, order=Relaxed) + 78 at atomic.rs:1502, name = 'queue_spin', stop reason = signal SIGSEGV: invalid address (fault address: 0x8)
    frame #0: 0x000055555555fb3e
queue_spin`core::sync::atomic::atomic_load::hd37078e3d501f11f(dst=0x0000000000000008, order=Relaxed) + 78 at atomic.rs:1502

In the first example, the head is pointing to null, which will happen when the queue is empty, but this particular branch is only hit when the queue is not empty. What's going on here? It turns out there's a nasty race and its down to deallocation. Let's look at deq again, this time with line numbers:

 64    pub unsafe fn deq(&mut self) -> Option<T> {
 65        let mut head: *mut Node<T>;
 66        let value: T;
 67        loop {
 68            head = self.head.load(Ordering::Acquire);
 69            let tail: *mut Node<T> = 
self.tail.load(Ordering::Relaxed); 70 let next: *mut Node<T> =
(*head).next.load(Ordering::Relaxed); 71 if head == self.head.load(Ordering::Relaxed) { 72 if head == tail { 73 if next.is_null() { 74 return None; 75 } 76 self.tail.compare_and_swap(tail, next,
Ordering::Relaxed); 77 } else { 78 let val: *mut T = (*next).value as *mut T; 79 if self.head.compare_and_swap(head, next,
Ordering::Release) == head { 80 value = *Box::from_raw(val); 81 break; 82 } 83 } 84 } 85 } 86 let head: Node<T> = *Box::from_raw(head); 87 drop(head); 88 Some(value) 89 }

Say we have four threads, A through D. Thread A gets to line 78 and is stopped. Thread A is now in possession of a head, a tail, and a next, which point to a sensible linked-list in memory. Now, threads B, C, and D each perform multiple enq and deq operations such that when A wakes up the linked list pointed to by the head of A is long gone. In fact, head itself is actually deallocated, but A gets lucky and the OS hasn't overwritten its memory yet.

Thread A wakes up and performs line 78 but next now points off to nonsense and the whole thing crashes. Alternatively, say we have two threads, A and B. Thread A wakes up and advances through to line 70 and is stopped. Thread B wakes up and is very fortunate and advances all the way through to line 88, deallocating head. The OS is feeling its oats and overwrites the memory that A is pointing at. Thread A then wakes up, fires (*head).next.load(Ordering::Relaxed), and subsequently crashes. There are many alternatives here. What's common between them all is deallocation happening while there's still outstanding references to one or more nodes. In fact, Michael and Scott's paper does mention this as a problem, but briefly in a way that's easy to overlook:

"To obtain consistent values of various pointers we rely on sequences of reads that re-check earlier values to be sure they haven't changed. These sequences of reads are similar to, but simpler than, the snapshots of Prakash et al. (we need to check only one shared variable rather than two). A similar technique can be used to prevent the race condition in Stone's blocking algorithm. We use Treiber's simple and efficient non-blocking stack algorithm to implement a non-blocking free list."

The key ideas here are sequences of reads that re-check earlier values and free list. What we've seen by inspection is that it's entirely possible to compare-and-swap a value that has changed—the ABA problem—which leaves our implementation pointing off into space. Also, immediately deallocating nodes will leave us open to crashes even absent a compare-and-swap. What Michael and Scott have done is create a minimal kind of memory management; rather than delete nodes, they move them into a free list to be reused or deleted at a later time. Free lists can be thread-local, in which case you avoid expensive synchronization, but it's still kind of tricky to be sure your thread-local free list doesn't have the same pointer as another thread.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.116.60.62