Semaphore

We discussed semaphores in passing in Chapter 5,  Locks – Mutex, Condvar, Barriers, and RWLockespecially with regard to the concurrency puzzles from The Little Semaphore. It's now, as promised, time to implement a semaphore. What exactly is a semaphore? Similar to our analysis of mutex as an atomic object, let's consider it:

  • Semaphore supports two operations, wait and signal.
  • A semaphore has an isize value that is used to track the available resource capacity of the semaphore. This value is only manipulable by wait and signal.
  • The operation wait decrements value. If the value is less than zero, 'wait' blocks the calling thread until such time as a value becomes available. If the value is not less than zero, the thread does not block.
  • The operation signal increments value. After increment, if the value is less than or equal to zero then there are one or more waiting threads. One of these threads is woken up. If the value is greater than zero after increment, there are no waiting threads.
  • All loads and stores that occur after and before a wait in program order must not be moved prior to or after the wait.
  • All loads and stores that occur before an signal in program order must not be moved to after the signal.

If this seems familiar, there's a good reason for that. Viewed a certain way, a mutex is a semaphore with only one resource available; a locking mutex maps to wait and signaling maps to unwait. We have specified the behaviour of loads and stores around the waiting and signaling of the semaphore to avoid the same deadlock behaviour identified previously in the mutex analysis.

There are some subtleties not captured in the preceding breakdown that don't affect the analysis of the semaphore but do affect the programming model. We'll be building a semaphore with fixed resources. That is, when the semaphore is created, the programmer is responsible for setting the maximum total resources available in the semaphore and ensuring that the semaphore starts with these resources available. Some semaphore implementations allow for the resource capacity to shift over time. These are commonly called counting semaphores. Our variant is called a bounded semaphore; the subvariant of this sort with only a single resource is called a binary semaphore. Behavior around the signaling of waiting threads may vary. We will signal our threads on a first-come, first-serve basis.

Let's dig in. Our semaphore implementation is in src/semaphore.rs and it's very short:

use crossbeam::sync::MsQueue;

unsafe impl Send for Semaphore {}
unsafe impl Sync for Semaphore {}

pub struct Semaphore {
    capacity: MsQueue<()>,
}

impl Semaphore {
    pub fn new(capacity: usize) -> Self {
        let q = MsQueue::new();
        for _ in 0..capacity {
            q.push(());
        }
        Semaphore { capacity: q }
    }

    pub fn wait(&self) -> () {
        self.capacity.pop();
    }

    pub fn signal(&self) -> () {
        self.capacity.push(());
    }
}

Well! Crossbeam's MsQueue has the correct ordering semantics when MsQueue::push and then MsQueue::pop are done in that sequence by the same thread, where pop has the added bonus of blocking until such a time where the queue is empty. So, our semaphore is an MsQueue filled with the capacity total (). The operation wait decreases the value—the total number of () in the queue—and does so with Acquire/Release ordering. The operation signal increases the value of the semaphore by pushing an additional () onto the queue with Release semantics. It is possible for a programming error to result in wait/signal invocations that are not one-to-one, and we can resolve this with the same Guard approach taken by Mutex and SwapMutex. The underlying queue linearizes Guardsee the next chapter for that discussion—and so our semaphore does so also. Let's try this thing out. We've got a program in-project to demonstrate the use of Semaphore, src/bin/status_demo.rs:

extern crate synchro;

use std::sync::Arc;
use synchro::Semaphore;
use std::{thread, time};

const THRS: usize = 4;
static mut COUNTS: &'static mut [u64] = &mut [0; THRS];
static mut STATUS: &'static mut [bool] = &mut [false; THRS];

fn worker(id: usize, gate: Arc<Semaphore>) -> () {
    unsafe {
        loop {
            gate.wait();
            STATUS[id] = true;
            COUNTS[id] += 1;
            STATUS[id] = false;
            gate.signal();
        }
    }
}

fn main() {
    let semaphore = Arc::new(Semaphore::new(1));

    for i in 0..THRS {
        let semaphore = Arc::clone(&semaphore);
        thread::spawn(move || worker(i, semaphore));
    }

    let mut counts: [u64; THRS] = [0; THRS];
    loop {
        unsafe {
            thread::sleep(time::Duration::from_millis(1_000));
            print!("|");
            for i in 0..THRS {
                print!(" {:>5}; {:010}/sec |", STATUS[i], 
COUNTS[i] - counts[i]); counts[i] = COUNTS[i]; } println!(); } } }

We make THRS total worker threads, whose responsibilities are to wait on the semaphore, flip their STATUS to true, add one to their COUNT, and flip their STATUS to false before signaling the semaphore. Mutable static arrays is kind of a goofy setup for any program, but it's a neat trick and causes no harm here, except for interacting oddly with the optimizer. If you compile this program under release mode, you may find that the optimizer determines worker to be a no-op. The main function creates a semaphore with a capacity of two, carefully offsets the workers, and then spins, forever printing out the contents of STATUS and COUNT. A run on my x86 test article looks like the following:

> ./target/release/status_demo
| false; 0000170580/sec |  true; 0000170889/sec |  true; 0000169847/sec | false; 0000169220/sec |
| false; 0000170262/sec | false; 0000170560/sec |  true; 0000169077/sec |  true; 0000169699/sec |
| false; 0000169109/sec | false; 0000169333/sec | false; 0000168422/sec | false; 0000168790/sec |
| false; 0000170266/sec |  true; 0000170653/sec | false; 0000168184/sec |  true; 0000169570/sec |
| false; 0000170907/sec | false; 0000171324/sec |  true; 0000170137/sec | false; 0000169227/sec |
...

And from my ARMv7 machine:

> ./target/release/status_demo
| false; 0000068840/sec |  true; 0000063798/sec | false; 0000063918/sec | false; 0000063652/sec |
| false; 0000074723/sec | false; 0000074253/sec |  true; 0000074392/sec |  true; 0000074485/sec |
|  true; 0000075138/sec | false; 0000074842/sec | false; 0000074791/sec |  true; 0000074698/sec |
| false; 0000075099/sec | false; 0000074117/sec | false; 0000074648/sec | false; 0000075083/sec |
| false; 0000075070/sec |  true; 0000074509/sec | false; 0000076196/sec |  true; 0000074577/sec |
|  true; 0000075257/sec |  true; 0000075682/sec | false; 0000074870/sec | false; 0000075887/sec |
...
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.119.235.79