Racing threads

You can also mark types as thread-safe, effectively promising the compiler that you've arranged all the data races in such a way as to make them safe. This is relatively rare in practice but it's worth seeing what it takes to make a type thread-safe in Rust before we start building on top of safe primitives. First, let's take a look at code that's intentionally sideways:

use std::{mem, thread};
use std::ops::{Deref, DerefMut};

unsafe impl Send for Ring {}
unsafe impl Sync for Ring {}

struct InnerRing {
    capacity: isize,
    size: usize,
    data: *mut Option<u32>,
}

#[derive(Clone)]
struct Ring {
    inner: *mut InnerRing,
}

What we have here is a ring, or a circular buffer, of u32. InnerRing holds a raw mutable pointer and is not automatically thread-safe as a result. But, we've promised to Rust that we know what we're doing and implement Send and Sync for Ring. Why not on InnerRing? When we manipulate an object in memory from multiple threads, the location of that object has to be fixed. InnerRing—and the data it contains—have to occupy a stable place in memory. Ring can and will be bounced around, at the very least from a creating thread to a worker. Now, what's that data there in InnerRing? It's a pointer to the 0th offset of a contiguous block of memory that will be the store of our circular buffer. At the time of writing this book, Rust has no stable allocator interface and, so, to get a contiguous allocation we have to do it in a roundabout fashion—strip a Vec<u32> down to its pointer:

impl Ring {
    fn with_capacity(capacity: usize) -> Ring {
        let mut data: Vec<Option<u32>> = Vec::with_capacity(capacity);
        for _ in 0..capacity {
            data.push(None);
        }
        let raw_data = (&mut data).as_mut_ptr();
        mem::forget(data);
        let inner_ring = Box::new(InnerRing {
            capacity: capacity as isize,
            size: 0,
            data: raw_data,
        });

        Ring {
            inner: Box::into_raw(inner_ring),
        }
    }
}

Ring::with_capacity functions much the same as other types' with_capacity from the Rust ecosystem: sufficient space is allocated to fit capacity items. In our case, we piggyback off Vec::with_capacity, being sure to allocate enough room for capacity Option<u32> instances, initializing to None along the full length of the memory block. If you'll recall from Chapter 3, The Rust Memory Model – Ownership, References and Manipulation, this is done as Vec is lazy about allocating and we require the allocation. Vec::as_mut_ptr returns a raw pointer to a slice but does not consume the original object, a problem for Ring. When data falls out of scope, the allocated block must survive. The standard library's mem::forget is ideal for this very use case. The allocation now being safe, an InnerRing is boxed to store it. The box is then consumed by Box::into_raw and passed into a Ring. Ta-da!

Interacting with a type that has an inner raw pointer can be verbose, scattering unsafe blocks around to little good effect. To that end, Ring gets a Deref and DerefMut implementation, both of which tidy up the interaction with Ring:

impl Deref for Ring {
    type Target = InnerRing;

    fn deref(&self) -> &InnerRing {
        unsafe { &*self.inner }
    }
}

impl DerefMut for Ring {
    fn deref_mut(&mut self) -> &mut InnerRing {
        unsafe { &mut *self.inner }
    }
}

Now that we have Ring defined, we can get into the meat of the program. We'll define two operations that will run concurrently with one another—writer and reader. The idea here is that writer will race around the ring writing, increasing u32 values into the ring whenever there's capacity to do so. (At type boundary the u32 will wrap.) The reader will race around behind the writer reading the values written, checking that each read value is up one from the previous read, with the caveat of wrapping. Here's the writer:

fn writer(mut ring: Ring) -> () {
    let mut offset: isize = 0;
    let mut cur: u32 = 0;
    loop {
        unsafe {
            if (*ring).size != ((*ring).capacity as usize) {
                *(*ring).data.offset(offset) = Some(cur);
                (*ring).size += 1;
                cur = cur.wrapping_add(1);
                offset += 1;
                offset %= (*ring).capacity;
            } else {
                thread::yield_now();
            }
        }
    }
}

Now, to be crystal clear, there's a lot that's wrong here. The ambition is to only write when the size of the ring buffer has not reached its capacity—meaning there's free space available. The actual writing is:

                *(*ring).data.offset(offset) = Some(cur);
                (*ring).size += 1;

That is, we dereference the ring (*ring) and get the pointer to the Option<u32> sized block at (*ring).data.offset(offset), which we then dereference and move Some(cur) onto the top of whatever was previously there. It is entirely possible that because of races on the size of the Ring that we'll overwrite an unread u32. The remainder of the write block sets up our next cur and our next offset, adding one and modulating around if need be:

            } else {
                thread::yield_now();
            }

thread::yield_now is new. The writer is a fast spin-loop—it checks a single condition and loops back around again for another try. This is very CPU and power inefficient. thread::yield_now hints to the operating system that this thread had no work to do and should be deprioritized in favor of other threads. The effect is OS and running environment-dependent but it's still a good idea to yield if you have to spin-loop:

fn reader(mut ring: Ring) -> () {
    let mut offset: isize = 0;
    let mut cur: u32 = 0;
    while cur < 1_000 {
        unsafe {
            if let Some(num) = mem::replace(
&mut *(*ring).data.offset(offset),
None)
{ assert_eq!(num, cur); (*ring).size -= 1; cur = cur.wrapping_add(1); offset += 1; offset %= (*ring).capacity; } else { thread::yield_now(); } } } }

The reader is similar to the writer, with the major difference being that it's not an infinite loop. Reads are done with mem::replace, swapping the block at the reader offset with None. When we hit bingo and score a Some, the memory of that u32 is now owned by the reader—a drop will be called when it goes out of scope. This is important. The writer is responsible for losing memory inside of a raw pointer and the reader is responsible for finding it. In this way, we are careful not to leak memory. Or, well, we would if there wasn't a race on the size of the Ring.

Finally, we have the main function:

fn main() {
    let capacity = 10;
    let ring = Ring::with_capacity(capacity);

    let reader_ring = ring.clone();
    let reader_jh = thread::spawn(move || {
        reader(reader_ring);
    });
    let _writer_jh = thread::spawn(move || {
        writer(ring);
    });

    reader_jh.join().unwrap();
}

There are two new things going on here. The first is our use of thread::spawn to start a reader and a writer. The move || {} construct is called a move closure. That is, every variable reference inside the closure from the outer scope is moved into the closure's scope. It's for this reason that we clone ring to reader_ring. Otherwise, there'd be no ring for the writer to work with. The second new thing is the JoinHandle that thread::spawn returns. Rust threads are not a drastic departure from the common POSIX or Windows threads. Rust threads receive their own stack and are independently runnable by the operating system.

Every Rust thread has a return value, though here ours is (). We get at that return value by joining on the thread's JoinHandler, pausing execution of our thread until the thread wraps up successfully or crashes. Our main thread assumes its child threads will return successfully, hence the join().unwrap().

What happens when we run our program? Well, failure, which is what we were expecting:

> rustc -C opt-level=3 data_race00.rs && ./data_race00
thread '<unnamed>' panicked at 'assertion failed: `(left == right)`
left: `31`,
right: `21`', data_race00.rs:90:17
note: Run with `RUST_BACKTRACE=1` for a backtrace.
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Any', libcore/result.rs:916:5
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.240.119