Looking into thread pool

Let's look into threadpool and understand its implementation. Hopefully by this point in the book, you have a sense of how you'd go about building your own thread-pooling library. Consider that for a moment, before we continue, and see how well your idea stacks up against this particular approach. We'll inspect the library (https://crates.io/crates/threadpool) at SHA a982e060ea2e3e85113982656014b212c5b88ba2.

The thread pool crate is not listed in its entirety. You can find the full listing in the book's source repository. 

Let's look first at the project's Cargo.toml:

[package]

name = "threadpool"
version = "1.7.1"
authors = ["The Rust Project Developers", "Corey Farwell <[email protected]>", "Stefan Schindler <[email protected]>"]
license = "MIT/Apache-2.0"
readme = "README.md"
repository = "https://github.com/rust-threadpool/rust-threadpool"
homepage = "https://github.com/rust-threadpool/rust-threadpool"
documentation = "https://docs.rs/threadpool"
description = """
A thread pool for running a number of jobs on a fixed set of worker threads.
"""
keywords = ["threadpool", "thread", "pool", "threading", "parallelism"]
categories = ["concurrency", "os"]

[dependencies]
num_cpus = "1.6"

Fairly minimal. The only dependency is num_cpus, a little library to determine the number of logical and physical cores available on the machine. On linux, this reads /proc/cpuinfo. On other operating systems, such as the BSDs or Windows, the library makes system calls. It's a clever little library and well worth reading if you need to learn how to target distinct function implementations across OSes. The key thing to take away from the threadpool's Cargo.toml is that it is almost entirely built from the tools available in the standard library. In fact, there's only a single implementation file in the library, src/lib.rs. All the code we'll discuss from this point can be found in that file.

Now, let's understand the builder pattern we saw in the previous section. The Builder type is defined like so:

#[derive(Clone, Default)]
pub struct Builder {
    num_threads: Option<usize>,
    thread_name: Option<String>,
    thread_stack_size: Option<usize>,
}

We only populated num_threads in the previous section. thread_stack_size is used to control the stack size of pool threads. As of writing, thread stack sizes are by default two megabytes. Standard library's std::thread::Builder::stack_size allows us to manually set this value. We could have, for instance, in our thread-per-connection example, set the stack size significantly lower, netting us more threads on the same hardware. After all, each thread allocated very little storage, especially if we had taken steps to read only into a fixed buffer. The thread_name field, like the stack size, is a toggle for std::thread::Builder::name. That is, it allows the user to set the thread name for all threads in the pool, a name they will all share. In my experience, naming threads is a relatively rare thing to do, especially in a pool, but it can sometimes be useful for logging purposes.

The pool builder works mostly as you'd expect, with the public functions setting the fields in the Builder struct. Where the implementation gets interesting is Builder::build:

    pub fn build(self) -> ThreadPool {
        let (tx, rx) = channel::<Thunk<'static>>();

        let num_threads = self.num_threads.unwrap_or_else(num_cpus::get);

        let shared_data = Arc::new(ThreadPoolSharedData {
            name: self.thread_name,
            job_receiver: Mutex::new(rx),
            empty_condvar: Condvar::new(),
            empty_trigger: Mutex::new(()),
            join_generation: AtomicUsize::new(0),
            queued_count: AtomicUsize::new(0),
            active_count: AtomicUsize::new(0),
            max_thread_count: AtomicUsize::new(num_threads),
            panic_count: AtomicUsize::new(0),
            stack_size: self.thread_stack_size,
        });

        // Threadpool threads
        for _ in 0..num_threads {
            spawn_in_pool(shared_data.clone());
        }

        ThreadPool {
            jobs: tx,
            shared_data: shared_data,
        }
    }

There's a lot going on here. Let's take it a bit at a time. First, that channel is the std::sync::mpsc::channel we discussed at length in Chapter 4, Sync and Send – the Foundation of Rust Concurrency. What is unfamiliar there is Thunk. Turns out, it's a type synonym:

type Thunk<'a> = Box<FnBox + Send + 'a>;

Now, what is FnBox? It's a small trait:

trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

It is the FnOnce trait we encountered in Chapter 3, The Rust Memory Model  Ownership, References and Manipulation, so if you read that chapter, you know that F will only be called once. The boxing of the function closure means its captured variables are available on the heap and don't get deallocated when the pool caller moves the closure out of scope. Great. Now, let's jump back to build and look at shared_data:

        let shared_data = Arc::new(ThreadPoolSharedData {
            name: self.thread_name,
            job_receiver: Mutex::new(rx),
            empty_condvar: Condvar::new(),
            empty_trigger: Mutex::new(()),
            join_generation: AtomicUsize::new(0),
            queued_count: AtomicUsize::new(0),
            active_count: AtomicUsize::new(0),
            max_thread_count: AtomicUsize::new(num_threads),
            panic_count: AtomicUsize::new(0),
            stack_size: self.thread_stack_size,
        });

Alright, several atomic usizes, a condvar, a mutex to protect the receiver side of the thunk channel, and a mutex that'll be paired with the condvar. There's a fair bit you can tell from reading the population of a struct, with a little bit of background information. The implementation of ThreadPoolSharedData is brief:

impl ThreadPoolSharedData {
    fn has_work(&self) -> bool {
        self.queued_count.load(Ordering::SeqCst) > 0 || 
self.active_count.load(Ordering::SeqCst) > 0 } /// Notify all observers joining this pool if there
/// is no more work to do. fn no_work_notify_all(&self) { if !self.has_work() { *self.empty_trigger .lock() .expect("Unable to notify all joining threads"); self.empty_condvar.notify_all(); } } }

The has_work function does a sequentially consistent read of the two indicators of work, an operation that is not exactly cheap considering the two sequentially consistent reads, but implies a need for accuracy in the response. The no_work_notify_all function is more complicated and mysterious. Happily, the function is used in the implementation of the next chunk of Build::build and will help clear up that mystery. The next chunk of build is:

        for _ in 0..num_threads {
            spawn_in_pool(shared_data.clone());
        }

        ThreadPool {
            jobs: tx,
            shared_data: shared_data,
        }
    }
}

For each of the num_threads, the spawn_in_pool function is called over a clone of the Arced ThreadPoolSharedData. Let's inspect spawn_in_pool:

fn spawn_in_pool(shared_data: Arc<ThreadPoolSharedData>) {
    let mut builder = thread::Builder::new();
    if let Some(ref name) = shared_data.name {
        builder = builder.name(name.clone());
    }
    if let Some(ref stack_size) = shared_data.stack_size {
        builder = builder.stack_size(stack_size.to_owned());
    }

As you might have expected, the function creates std::thread::Builder and pulls references to the name and stack size embedded in the ThreadPoolSharedData shared data. With these variables handy, the builder.spawn is called, passing in a closure in the usual fashion:

    builder
        .spawn(move || {
            // Will spawn a new thread on panic unless it is cancelled.
            let sentinel = Sentinel::new(&shared_data);

Well, let's take a look at Sentinel:

struct Sentinel<'a> {
    shared_data: &'a Arc<ThreadPoolSharedData>,
    active: bool,
}

It holds a reference to Arc<ThreadPoolSharedData>—avoiding increasing the reference counter—and an active boolean. The implementation is likewise brief:

impl<'a> Sentinel<'a> {
    fn new(shared_data: &'a Arc<ThreadPoolSharedData>) -> Sentinel<'a> {
        Sentinel {
            shared_data: shared_data,
            active: true,
        }
    }

    /// Cancel and destroy this sentinel.
    fn cancel(mut self) {
        self.active = false;
    }
}

The real key here is the Drop implementation:

impl<'a> Drop for Sentinel<'a> {
    fn drop(&mut self) {
        if self.active {
            self.shared_data.active_count.fetch_sub(1, Ordering::SeqCst);
            if thread::panicking() {
                self.shared_data.panic_count
.fetch_add(1, Ordering::SeqCst); } self.shared_data.no_work_notify_all(); spawn_in_pool(self.shared_data.clone()) } } }

Recall how in the previous section, our join vector grew without bound, even though threads in that vector had panicked and were no longer working. There is an interface available to determine whether the current thread is in a panicked state, std::thread::panicking(), in use here.

When a thread panics, it drops its storage, which, in this pool, includes the allocated Sentinel.  drop then checks the active flag on the Sentinel, decrements the active_count of the ThreadPoolSharedData, increases its panic_count, calls the as-yet mysterious no_work_notify_all and then adds an additional thread to the pool. In this way, the pool maintains its appropriate size and there is no need for any additional monitoring of threads to determine when they need to be recycled: the type system does all the work.

Let's hop back into spawn_in_pool:

            loop {
                // Shutdown this thread if the pool has become smaller
                let thread_counter_val = shared_data
.active_count
.load(Ordering::Acquire); let max_thread_count_val = shared_data
.max_thread_count
.load(Ordering::Relaxed); if thread_counter_val >= max_thread_count_val { break; }

Here, we see the start of the infinite loop of the builder.spawn function, plus a check to shut down threads if the pool size has decreased since the last iteration of the loop. ThreadPool exposes the set_num_threads function to allow changes to the pool's size at runtime. Now, why an infinite loop? Spawning a new thread is not an entirely fast operation on some systems and, besides, it's not a free operation. If you can avoid the cost, you may as well. Some pool implementations in other languages spawn a new thread for every bit of work that comes in, fearing memory pollution. This is less a problem in Rust, where unsafe memory access has to be done intentionally and FnBox is effectively a trap for such behavior anyway, owing to the fact that the closure will have no pointers to the pool's private memory. The loop exists to pull Thunks from the receiver channel side in ThreadPoolSharedData:

                let message = {
                    // Only lock jobs for the time it takes
                    // to get a job, not run it.
                    let lock = shared_data
                        .job_receiver
                        .lock()
                        .expect("Worker thread unable to 
lock job_receiver"); lock.recv() };

The message may be an error, implying that the ThreadPool was dropped, closing the sender channel side. But, should the message be Ok, we'll have our FnBox to call:

                let job = match message {
                    Ok(job) => job,
                    // The ThreadPool was dropped.
                    Err(..) => break,
                };

The final bit of spawn_in_pool is uneventful:

                shared_data.active_count.fetch_add(1, Ordering::SeqCst);
                shared_data.queued_count.fetch_sub(1, Ordering::SeqCst);

                job.call_box();

                shared_data.active_count.fetch_sub(1, Ordering::SeqCst);
                shared_data.no_work_notify_all();
            }

            sentinel.cancel();
        })
        .unwrap();
}

The FnBox called job is called via call_box and if this panics, killing the thread, the Sentinel cleans up the atomic references as appropriate and starts a new thread in the pool. By leaning on Rust's type system and memory model, we get a cheap thread-pool implementation that spawns threads only when needed, with no fears of accidentally polluting memory between jobs.

ThreadPool::execute is a quick boxing of FnOnce, pushed into the sender side of the Thunk channel:

    pub fn execute<F>(&self, job: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.shared_data.queued_count.fetch_add(1, Ordering::SeqCst);
        self.jobs
            .send(Box::new(job))
            .expect("ThreadPool::execute unable to send job into queue.");
    }

The last piece here is ThreadPool::join. This is where ThreadPoolSharedData::no_work_notify_all comes into focus. Let's look at join:

    pub fn join(&self) {
        // fast path requires no mutex
        if self.shared_data.has_work() == false {
            return ();
        }

        let generation = self.shared_data
.join_generation
.load(Ordering::SeqCst); let mut lock = self.shared_data.empty_trigger.lock().unwrap(); while generation == self.shared_data
.join_generation
.load(Ordering::Relaxed) && self.shared_data.has_work() { lock = self.shared_data.empty_condvar.wait(lock).unwrap(); } // increase generation if we are the first thread to
// come out of the loop self.shared_data.join_generation.compare_and_swap( generation, generation.wrapping_add(1), Ordering::SeqCst, ); }

The function calls first to has_work, bailing out early if there are no active or queued threads in the pool. No reason to block the caller there. Then, generation is set up to act as a condition variable in the loop surrounding empty_condvar. Every thread that joins to the pool checks that the pool generation has not shifted, implying some other thread has unjoined, and that there's work yet to do. Recall that it's no_work_notify_all that calls notify_all on condvar, this function in turn being called when either a Sentinel drops or the inner-loop of spawn_in_pool returns from a job. Any joined thread waking on those two conditions—a job being completed or crashing—checks their condition, incrementing the generation on their the way to becoming unjoined.

That's it! That's a thread pool, a thing built out of the pieces we've discussed so far in this book. If you wanted to make a thread pool without queuing, you'd push an additional check into execute. Some of the sequentially consistent atomic operations could likely be relaxed, as well, as potentially the consequence of making a simple implementation more challenging to reason with. It's potentially worth it for the performance gain if your executed jobs are very brief. In fact, we'll discuss a library later in this chapter that ships an alternative thread pool implementation for just such a use case, though it is quite a bit more complex than the one we've just discussed.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.224.30.19