QuickCheck and loops

Previous chapters discussed QuickCheck at length and we'll not duplicate that here. Instead, let's dig into a test from hopper, defined in src/lib.rs:

    #[test]
    fn multi_thread_concurrent_snd_and_rcv_round_trip() {
        fn inner(
            total_senders: usize,
            in_memory_bytes: usize,
            disk_bytes: usize,
            max_disk_files: usize,
            vals: Vec<u64>,
        ) -> TestResult {
            let sz = mem::size_of::<u64>();
            if total_senders == 0 || 
total_senders > 10 ||
vals.len() == 0 || (vals.len() < total_senders) ||
(in_memory_bytes / sz) == 0 || (disk_bytes / sz) == 0 { return TestResult::discard(); } TestResult::from_bool(
multi_thread_concurrent_snd_and_rcv_round_trip_exp( total_senders, in_memory_bytes, disk_bytes, max_disk_files, vals, )) } QuickCheck::new() .quickcheck(inner as fn(usize, usize, usize, usize,
Vec<u64>) -> TestResult); }

This test sets up a multiple sender, single receiver round trip environment, being careful to reject inputs that allow less space in the in-memory buffer than 64 bits, no senders, or the like. It defers to another function, multi_thread_concurrent_snd_and_rcv_round_trip_exp, to actually run the test.

This setup is awkward, admittedly, but has the benefit of allowing multi_thread_concurrent_snd_and_rcv_round_trip_exp to be run over explicit inputs. That is, when a bug is found you can easily re-play that test by creating a manual—or explicit, in hopper testing terms—test. The inner test function is complicated and we'll consider it in parts:

    fn multi_thread_concurrent_snd_and_rcv_round_trip_exp(
        total_senders: usize,
        in_memory_bytes: usize,
        disk_bytes: usize,
        max_disk_files: usize,
        vals: Vec<u64>,
    ) -> bool {
        if let Ok(dir) = tempdir::TempDir::new("hopper") {
            if let Ok((snd, mut rcv)) = 
channel_with_explicit_capacity( "tst", dir.path(), in_memory_bytes, disk_bytes, max_disk_files, ) {

Much like our example usage of hopper at the start of this section, the inner test function uses tempdir (https://crates.io/crates/tempdir) to create a temporary path for passing into channel_with_explicit_capacity. Except, we're careful not to unwrap here. Because hopper makes use of the filesystem and because Rust QuickCheck is aggressively multi-threaded, it's possible that any individual test run will hit a temporary case of file-handler exhaustion. This throws QuickCheck off, with the test failure being totally unrelated to the inputs of this particular execution.

The next piece is as follows:

                let chunk_size = vals.len() / total_senders;

                let mut snd_jh = Vec::new();
                let snd_vals = vals.clone();
                for chunk in snd_vals.chunks(chunk_size) {
                    let mut thr_snd = snd.clone();
                    let chunk = chunk.to_vec();
                    snd_jh.push(thread::spawn(move || {
                        let mut queued = Vec::new();
                        for mut ev in chunk {
                            loop {
                                match thr_snd.send(ev) {
                                    Err(res) => {
                                        ev = res.0;
                                    }
                                    Ok(()) => {
                                        break;
                                    }
                                }
                            }
                            queued.push(ev);
                        }
                        let mut attempts = 0;
                        loop {
                            if thr_snd.flush().is_ok() {
                                break;
                            }
                            thread::sleep(
::std::time::Duration::from_millis(100)
); attempts += 1; assert!(attempts < 10); } queued })) }

Here we have the creation of the sender threads, each of which grabs an equal sized chunk of vals. Now, because of indeterminacy in thread scheduling, it's not possible for us to model the order in which elements of vals will be pushed into hopper. All we can do is confirm that there are no lost elements after transmission through hopper. They may, in fact, be garbled in terms of order:

                let expected_total_vals = vals.len();
                let rcv_jh = thread::spawn(move || {
                    let mut collected = Vec::new();
                    let mut rcv_iter = rcv.iter();
                    while collected.len() < expected_total_vals {
                        let mut attempts = 0;
                        loop {
                            match rcv_iter.next() {
                                None => {
                                    attempts += 1;
                                    assert!(attempts < 10_000);
                                }
                                Some(res) => {
                                    collected.push(res);
                                    break;
                                }
                            }
                        }
                    }
                    collected
                });

                let mut snd_vals: Vec<u64> = Vec::new();
                for jh in snd_jh {
                    snd_vals.append(&mut jh.join().expect("snd join 
failed")); } let mut rcv_vals = rcv_jh.join().expect("rcv join
failed"); rcv_vals.sort(); snd_vals.sort(); assert_eq!(rcv_vals, snd_vals);

Another test with a single sender, single_sender_single_rcv_round_trip, is able to check for correct ordering as well as no data loss:

    #[test]
    fn single_sender_single_rcv_round_trip() {
        // Similar to the multi sender test except now with a single 
// sender we can guarantee order. fn inner( in_memory_bytes: usize, disk_bytes: usize, max_disk_files: usize, total_vals: usize, ) -> TestResult { let sz = mem::size_of::<u64>(); if total_vals == 0 || (in_memory_bytes / sz) == 0 ||
(disk_bytes / sz) == 0 { return TestResult::discard(); } TestResult::from_bool(
single_sender_single_rcv_round_trip_exp( in_memory_bytes, disk_bytes, max_disk_files, total_vals, )) } QuickCheck::new().quickcheck(inner as fn(usize, usize,
usize, usize) -> TestResult); }

Like it's multi-cousin, this QuickCheck test uses an inner function to perform the actual test:

    fn single_sender_single_rcv_round_trip_exp(
        in_memory_bytes: usize,
        disk_bytes: usize,
        max_disk_files: usize,
        total_vals: usize,
    ) -> bool {
        if let Ok(dir) = tempdir::TempDir::new("hopper") {
            if let Ok((mut snd, mut rcv)) = 
channel_with_explicit_capacity( "tst", dir.path(), in_memory_bytes, disk_bytes, max_disk_files, ) { let builder = thread::Builder::new(); if let Ok(snd_jh) = builder.spawn(move || { for i in 0..total_vals { loop { if snd.send(i).is_ok() { break; } } } let mut attempts = 0; loop { if snd.flush().is_ok() { break; } thread::sleep(
::std::time::Duration::from_millis(100)
); attempts += 1; assert!(attempts < 10); } }) { let builder = thread::Builder::new(); if let Ok(rcv_jh) = builder.spawn(move || { let mut rcv_iter = rcv.iter(); let mut cur = 0; while cur != total_vals { let mut attempts = 0; loop { if let Some(rcvd) = rcv_iter.next() { debug_assert_eq!( cur, rcvd, "FAILED TO GET ALL IN ORDER: {:?}", rcvd, ); cur += 1; break; } else { attempts += 1; assert!(attempts < 10_000); } } } }) { snd_jh.join().expect("snd join failed"); rcv_jh.join().expect("rcv join failed"); } } } } true }

This, too, should appear familiar, except that the Receiver thread is now able to check order. Where previously a Vec<u64> was fed into the function, we now stream 0, 1, 2, .. total_vals through the hopper queue, asserting that order and there are no gaps on the other side. Single runs on a single input will fail to trigger low-probability race issues reliably, but that's not the goal. We're searching for logic goofs. For instance, an earlier version of this library would happily allow an in-memory maximum amount of bytes less than the total bytes of an element T. Another could fit multiple instances of T into the buffer but if the total_vals were odd and the in-memory size were small enough to require disk-paging then the last element of the stream would never be kicked out. In fact, that's still an issue. It's a consequence of the lazy flip to disk mode in the sender; without another element to potentially trigger a disk placement to the in-memory buffer, the write will be flushed to disk but the receiver will never be aware of it. To that end, the sender does expose a flush function, which you see in use in the tests. In practice, in cernan, flushing is unnecessary. But, it's a corner of the design that the authors did not expect and may well have had a hard time noticing had this gone out into the wild.

The inner test of our single-sender variant is also used for the repeat-loop variant of hopper testing:

    #[test]
    fn explicit_single_sender_single_rcv_round_trip() {
        let mut loops = 0;
        loop {
         assert!(single_sender_single_rcv_round_trip_exp(8, 8, 5, 10));
            loops += 1;
            if loops > 2_500 {
                break;
            }
            thread::sleep(::std::time::Duration::from_millis(1));
        }
    }

Notice that here the inner loop is only for 2,500 iterations. This is done in deference to the needs of the CI servers which, don't care to run high CPU load code for hours at a time. In development, that 2,500 will be adjusted up. But the core idea is apparent; check that a stream of ordered inputs returns through the hopper queue in order and intact over and over and over again. QuickCheck searches the dark corners of the state space and more traditional manual testing hammers the same spot to dig in to computer indeterminism.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.120.136