Our hypothesis this time is that the spurious failures are caused by data contention around the work queue. The main thread adds to the work queue, the pullWork function removes from the queue, and the worker often asks whether there’s work available in the queue.
Our tests aren’t simply failing; they are generating segmentation faults. Concurrent modification of the work queue is the likely suspect. As an attempt at remediation, we first write a test that consistently generates the same failure.
c9/10/ThreadPoolTest.cpp | |
| TEST_GROUP(AThreadPool_AddRequest) { |
| mutex m; |
| ThreadPool pool; |
| condition_variable wasExecuted; |
| unsigned int count{0}; |
| |
* | vector<shared_ptr<thread>> threads; |
| |
| void setup() override { |
| pool.start(); |
| } |
| |
* | void teardown() override { |
* | for (auto& t: threads) t->join(); |
* | } |
* | // ... |
| }; |
| // ... |
| TEST(AThreadPool_AddRequest, HoldsUpUnderClientStress) { |
| Work work{[&] { incrementCountAndNotify(); }}; |
| unsigned int NumberOfWorkItems{10}; |
| unsigned int NumberOfThreads{10}; |
| |
| for (unsigned int i{0}; i < NumberOfThreads; i++) |
| threads.push_back( |
| make_shared<thread>([&] { |
| for (unsigned int j{0}; j < NumberOfWorkItems; j++) |
| pool.add(work); |
| })); |
| waitForCountAndFailOnTimeout(NumberOfThreads * NumberOfWorkItems); |
| } |
Our test creates an arbitrary number of work request threads via a for loop. It stores each thread in a vector so that the test can properly join to each upon completion. You can find this cleanup code in the test group’s teardown function.
With NumberOfThreads and NumberOfWorkItems set to one each, we see the same intermittent failure. We experiment with a few combinations until we discover that ten threads sending ten requests each consistently generates a seg fault. Yay!
We add lock guards to each of hasWork, add, and pullWork. These guards use a mutex object (lockObject_) to prevent concurrent access to code in the scope following the lock guard.
c9/11/ThreadPool.h | |
| #include <string> |
| #include <deque> |
| #include <thread> |
| #include <memory> |
| #include <atomic> |
* | #include <mutex> |
| |
| #include "Work.h" |
| |
| class ThreadPool { |
| public: |
| // ... |
| bool hasWork() { |
* | std::lock_guard<std::mutex> block(mutex_); |
| return !workQueue_.empty(); |
| } |
| |
| void add(Work work) { |
* | std::lock_guard<std::mutex> block(mutex_); |
| workQueue_.push_front(work); |
| } |
| |
| Work pullWork() { |
* | std::lock_guard<std::mutex> block(mutex_); |
| |
| auto work = workQueue_.back(); |
| workQueue_.pop_back(); |
| return work; |
| } |
| // ... |
| std::atomic<bool> done_{false}; |
| std::deque<Work> workQueue_; |
| std::shared_ptr<std::thread> workThread_; |
* | std::mutex mutex_; |
| }; |
The tests pass with no problems. We bump the number of threads up to 200, and things still look good.
18.191.189.23