Read many, write exclusive locks – RwLock

Consider a situation where you have a resource that must be manipulated only a single thread at a time, but is safe to be queried by many—that is, you have many readers and only one writer. While we could protect this resource with a Mutex, the trouble is that the mutex makes no distinction between its lockers; every thread will be forced to wait, no matter what their intentions. RwLock<T> is an alternative to the mutex concept, allowing for two kinds of locks—read and write. Analogously to Rust's references, there can only be one write lock taken at a time but multiple reader locks, exclusive of a write lock. Let's look at an example:

use std::thread;
use std::sync::{Arc, RwLock};

fn main() {
    let resource: Arc<RwLock<u16>> = Arc::new(RwLock::new(0));

    let total_readers = 5;

    let mut reader_jhs = Vec::with_capacity(total_readers);
    for _ in 0..total_readers {
        let resource = Arc::clone(&resource);
        reader_jhs.push(thread::spawn(move || {
            let mut total_lock_success = 0;
            let mut total_lock_failure = 0;
            let mut total_zeros = 0;
            while total_zeros < 100 {
                match resource.try_read() {
                    Ok(guard) => {
                        total_lock_success += 1;
                        if *guard == 0 {
                            total_zeros += 1;
                        }
                    }
                    Err(_) => {
                        total_lock_failure += 1;
                    }
                }
            }
            (total_lock_failure, total_lock_success)
        }));
    }

    {
        let mut loops = 0;
        while loops < 100 {
            let mut guard = resource.write().unwrap();
            *guard = guard.wrapping_add(1);
            if *guard == 0 {
                loops += 1;
            }
        }
    }

    for jh in reader_jhs {
        println!("{:?}", jh.join().unwrap());
    }
}

The idea here is that we'll have one writer thread spinning and incrementing, in a wrapping fashion, a shared resource—a u16. Once the u16 has been wrapped 100 times, the writer thread will exit. Meanwhile, a total_readers number of read threads will attempt to take a read lock on the shared resource—a u16—until it hits zero 100 times. We're gambling here, essentially, on thread ordering. Quite often, the program will exit with this result:

(0, 100)
(0, 100)
(0, 100)
(0, 100)
(0, 100)

This means that each reader thread never failed to get its read lock—there were no write locks present. That is, the reader threads were scheduled before the writer. Our main function only joins on reader handlers and so the writer is left writing as we exit. Sometimes, we'll hit just the right scheduling order, and get the following result:

(0, 100)
(126143752, 2630308)
(0, 100)
(0, 100)
(126463166, 2736405)

In this particular instance, the second and final reader threads were scheduled just after the writer and managed to catch a time when the guard was not zero. Recall that the first element of the pair is the total number of times the reader thread was not able to get a read lock and was forced to retry. The second is the number of times that the lock was acquired. In total, the writer thread did (2^18 * 100) ~= 2^24 writes, whereas the second reader thread did log_2 2630308 ~= 2^21 reads. That's a lot of lost writes, which, maybe, is okay. Of more concern, that's a lot of useless loops, approximately 2^26. Ocean levels are rising and we're here burning up electricity like nobody had to die for it.

How do we avoid all this wasted effort? Well, like most things, it depends on what we're trying to do. If we need every reader to get every write, then an MPSC is a reasonable choice. It would look like this:

use std::thread;
use std::sync::mpsc;

fn main() {
    let total_readers = 5;
    let mut sends = Vec::with_capacity(total_readers);

    let mut reader_jhs = Vec::with_capacity(total_readers);
    for _ in 0..total_readers {
        let (snd, rcv) = mpsc::sync_channel(64);
        sends.push(snd);
        reader_jhs.push(thread::spawn(move || {
            let mut total_zeros = 0;
            let mut seen_values = 0;
            for v in rcv {
                seen_values += 1;
                if v == 0 {
                    total_zeros += 1;
                }
                if total_zeros >= 100 {
                    break;
                }
            }
            seen_values
        }));
    }

    {
        let mut loops = 0;
        let mut cur: u16 = 0;
        while loops < 100 {
            cur = cur.wrapping_add(1);
            for snd in &sends {
                snd.send(cur).expect("failed to send");
            }
            if cur == 0 {
                loops += 1;
            }
        }
    }

    for jh in reader_jhs {
        println!("{:?}", jh.join().unwrap());
    }
}

It will run—for a while—and print out the following:

6553600
6553600
6553600
6553600
6553600

But what if every reader does not need to see every write, meaning that it's acceptable for a reader to miss writes so long as it does not miss all of the writes? We have options. Let's look at one.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.200.3