Blocking until conditions change – condvar

One option is a condvar, or CONDition VARiable. Condvars are a nifty way to block a thread, pending a change in some Boolean condition. One difficulty is that condvars are associated exclusively with mutexes, but in this example, we don't mind all that much.

The way a condvar works is that, after taking a lock on a mutex, you pass the MutexGuard into Condvar::wait, which blocks the thread. Other threads may go through this same process, blocking on the same condition. Some other thread will take the same exclusive lock and eventually call either notify_one or notify_all on the condvar. The first wakes up a single thread, the second wakes up all threads. Condvars are subject to spurious wakeup, meaning the thread may leave its block without a notification being sent to it. For this reason condvars check their conditions in a loop. But, once the condvar wakes, you are guaranteed to hold the mutex, which prevents deadlocks on spurious wakeup.

Let's adapt our example to use a condvar. There's actually a fair bit going on in this example, so we'll break it down into pieces:

use std::thread;
use std::sync::{Arc, Condvar, Mutex};

fn main() {
    let total_readers = 5;
    let mutcond: Arc<(Mutex<(bool, u16)>, Condvar)> =
        Arc::new((Mutex::new((false, 0)), Condvar::new()));

Our preamble is similar to the previous examples, but the setup is already quite strange. We're synchronizing threads on mutcond, which is an Arc<(Mutex<(bool, u16)>, Condvar)>. Rust's condvar is a touch awkward. It's undefined behavior to associate a condvar with more than one mutex, but there's really nothing in the type of Condvar that makes that an invariant. We just have to remember to keep them associated. To that end, it's not uncommon to see a Mutex and Condvar paired up in a tuple, as here. Now, why Mutex<(bool, u16)>? The second element of the tuple is our resource, which is common to other examples. The first element is a Boolean flag, which we use as a signal to mean that there are writes available. Here are our reader threads:

    let mut reader_jhs = Vec::with_capacity(total_readers);
    for _ in 0..total_readers {
        let mutcond = Arc::clone(&mutcond);
        reader_jhs.push(thread::spawn(move || {
            let mut total_zeros = 0;
            let mut total_wakes = 0;
            let &(ref mtx, ref cnd) = &*mutcond;

            while total_zeros < 100 {
                let mut guard = mtx.lock().unwrap();
                while !guard.0 {
                    guard = cnd.wait(guard).unwrap();
                }
                guard.0 = false;

                total_wakes += 1;
                if guard.1 == 0 {
                    total_zeros += 1;
                }
            }
            total_wakes
        }));
    }

Until total_zeros hits 100, the reader thread locks the mutex, checks the guard inside the mutex for write availability, and, if there are no writes, does a wait on the condvar, which gives up the lock. The reader thread is then blocked until a notify_all is called—as we'll see shortly. Every reader thread then races to be the first to reacquire the lock. The lucky winner notes that there are no more writes to be read and then does the normal flow we've seen in previous examples. It bears repeating that every thread that wakes up from a condition wait is racing to be the first to reacquire the mutex. Our reader is uncooperative in that it immediately prevents the chance of any other reader threads finding a resource available. However, they will still wake up spuriously and be forced to wait again. Maybe. The reader threads are also competing with the writer thread to acquire the lock. Let's look at the writer thread:

    let _ = thread::spawn(move || {
        let &(ref mtx, ref cnd) = &*mutcond;
        loop {
            let mut guard = mtx.lock().unwrap();
            guard.1 = guard.1.wrapping_add(1);
            guard.0 = true;
            cnd.notify_all();
        }
    });

The writer thread is an infinite loop, which we orphan in an unjoined thread. Now, it's entirely possible that the writer thread will acquire the lock, bump the resource, notify waiting reader threads, give up the lock, and then immediately re-acquire the lock to begin the while process before any reader threads can get scheduled in. This means it's entirely possible that the resource being zero will happen several times before a reader thread is lucky enough to notice. Let's close out this program:

    for jh in reader_jhs {
        println!("{:?}", jh.join().unwrap());
    }
}

Ideally, what we'd like is some manner of bi-directionality—we'd like the writer to signal that there are reads and the reader to signal that there is capacity. This is suspiciously like how Ring in the previous chapter worked through its size variable, when we were careful to not race on that variable, that is. We might, for instance, layer another condition variable into the mix, this one for the writer, but that's not what we have here and the program suffers for it. Here's one run:

7243473
6890156
6018468
6775609
6192116

Phew! That's significantly more loops than previous examples. None of this is to say that condition variables are hard to use—they're not—it's just that they need to be used in conjunction with other primitives. We'll see an excellent example of this later in the chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.134.17