Creating Client Threads in the Test

Our hypothesis this time is that the spurious failures are caused by data contention around the work queue. The main thread adds to the work queue, the pullWork function removes from the queue, and the worker often asks whether there’s work available in the queue.

Our tests aren’t simply failing; they are generating segmentation faults. Concurrent modification of the work queue is the likely suspect. As an attempt at remediation, we first write a test that consistently generates the same failure.

c9/10/ThreadPoolTest.cpp
 
TEST_GROUP(AThreadPool_AddRequest) {
 
mutex m;
 
ThreadPool pool;
 
condition_variable wasExecuted;
 
unsigned​ ​int​ count{0};
 
*
vector<shared_ptr<thread>> threads;
 
 
void​ setup() override {
 
pool.start();
 
}
 
*
void​ teardown() override {
*
for​ (​auto​& t: threads) t->join();
*
}
*
// ...
 
};
 
// ...
 
TEST(AThreadPool_AddRequest, HoldsUpUnderClientStress) {
 
Work work{[&] { incrementCountAndNotify(); }};
 
unsigned​ ​int​ NumberOfWorkItems{10};
 
unsigned​ ​int​ NumberOfThreads{10};
 
 
for​ (​unsigned​ ​int​ i{0}; i < NumberOfThreads; i++)
 
threads.push_back(
 
make_shared<thread>([&] {
 
for​ (​unsigned​ ​int​ j{0}; j < NumberOfWorkItems; j++)
 
pool.add(work);
 
}));
 
waitForCountAndFailOnTimeout(NumberOfThreads * NumberOfWorkItems);
 
}

Our test creates an arbitrary number of work request threads via a for loop. It stores each thread in a vector so that the test can properly join to each upon completion. You can find this cleanup code in the test group’s teardown function.

With NumberOfThreads and NumberOfWorkItems set to one each, we see the same intermittent failure. We experiment with a few combinations until we discover that ten threads sending ten requests each consistently generates a seg fault. Yay!

We add lock guards to each of hasWork, add, and pullWork. These guards use a mutex object (lockObject_) to prevent concurrent access to code in the scope following the lock guard.

c9/11/ThreadPool.h
 
#include <string>
 
#include <deque>
 
#include <thread>
 
#include <memory>
 
#include <atomic>
*
#include <mutex>
 
 
#include "Work.h"
 
 
class​ ThreadPool {
 
public​:
 
// ...
 
bool​ hasWork() {
*
std::lock_guard<std::mutex> block(mutex_);
 
return​ !workQueue_.empty();
 
}
 
 
void​ add(Work work) {
*
std::lock_guard<std::mutex> block(mutex_);
 
workQueue_.push_front(work);
 
}
 
 
Work pullWork() {
*
std::lock_guard<std::mutex> block(mutex_);
 
 
auto​ work = workQueue_.back();
 
workQueue_.pop_back();
 
return​ work;
 
}
 
// ...
 
std::atomic<​bool​> done_{false};
 
std::deque<Work> workQueue_;
 
std::shared_ptr<std::thread> workThread_;
*
std::mutex mutex_;
 
};

The tests pass with no problems. We bump the number of threads up to 200, and things still look good.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.189.23