Exposing Concurrency Issues

We want to demonstrate that the worker thread can pull and execute multiple work items from the queue.

c9/7/ThreadPoolTest.cpp
 
TEST(AThreadPool, ExecutesAllWork) {
 
pool.start();
 
unsigned​ ​int​ count{0};
 
unsigned​ ​int​ NumberOfWorkItems{3};
 
condition_variable wasExecuted;
 
Work work{[&] {
 
std::unique_lock<std::mutex> lock(m);
 
++count;
 
wasExecuted.notify_all();
 
}};
 
for​ (​unsigned​ ​int​ i{0}; i < NumberOfWorkItems; i++)
 
pool.add(work);
 
unique_lock<mutex> lock(m);
 
CHECK_TRUE(wasExecuted.wait_for(lock, chrono::milliseconds(100),
 
[&] { ​return​ count == NumberOfWorkItems; }));
 
}

Our implementation introduces a while loop and a boolean flag that tells the loop to stop when the ThreadPool instance destructs.

c9/7/ThreadPool.h
 
#include <string>
 
#include <deque>
 
#include <thread>
 
#include <memory>
*
#include <atomic>
 
#include "Work.h"
 
class​ ThreadPool {
 
public​:
 
virtual​ ~ThreadPool() {
*
done_ = true;
 
if​ (workThread_)
 
workThread_->join();
 
}
 
// ...
 
private​:
 
void​ worker() {
*
while​ (!done_) {
 
while​ (!hasWork())
 
;
 
pullWork().execute();
*
}
 
}
*
std::atomic<​bool​> done_{false};
 
std::deque<Work> workQueue_;
 
std::shared_ptr<std::thread> workThread_;
 
};

Unfortunately...no, fortunately, our lame implementation hangs the test run every time. Consistent failure when dealing with threads is a great step toward a solution. A bit of analysis suggests that once the test completes, the ThreadPool destructor sets the done_ flag to true and then attempts to join the thread. The thread can’t complete because it’s stuck in the while loop that waits for work to be available.

We add a conditional to our wait-for-work loop and also break out of the loop if the done_ flag turns on.

c9/8/ThreadPool.h
 
void​ worker() {
 
while​ (!done_) {
*
while​ (!done_ && !hasWork())
*
;
*
if​ (done_) ​break​;
 
pullWork().execute();
 
}
 
}

Our tests no longer hang and pass on their first run. Once again, however, they fail intermittently. We need to hit things harder from the test so that it fails every time. A quick attempt at bumping up the number of work items added to the loop doesn’t appear to make much difference. Our test needs to add work items from multiple threads created by the test itself.

Let’s refactor first. We clean up the two tests that add work to the pool.

c9/9/ThreadPoolTest.cpp
 
TEST_GROUP(AThreadPool_AddRequest) {
 
mutex m;
 
ThreadPool pool;
 
condition_variable wasExecuted;
 
unsigned​ ​int​ count{0};
 
void​ setup() override {
 
pool.start();
 
}
 
 
void​ incrementCountAndNotify() {
 
std::unique_lock<std::mutex> lock(m);
 
++count;
 
wasExecuted.notify_all();
 
}
 
 
void​ waitForCountAndFailOnTimeout(
 
unsigned​ ​int​ expectedCount,
 
const​ milliseconds& time=milliseconds(100)) {
 
unique_lock<mutex> lock(m);
 
CHECK_TRUE(wasExecuted.wait_for(lock, time,
 
[&] { ​return​ expectedCount == count; }));
 
}
 
};
 
 
TEST(AThreadPool_AddRequest, PullsWorkInAThread) {
 
Work work{[&] { incrementCountAndNotify(); }};
 
unsigned​ ​int​ NumberOfWorkItems{1};
 
 
pool.add(work);
 
waitForCountAndFailOnTimeout(NumberOfWorkItems);
 
}
 
 
TEST(AThreadPool_AddRequest, ExecutesAllWork) {
 
Work work{[&] { incrementCountAndNotify(); }};
 
unsigned​ ​int​ NumberOfWorkItems{3};
 
 
for​ (​unsigned​ ​int​ i{0}; i < NumberOfWorkItems; i++)
 
pool.add(work);
 
 
waitForCountAndFailOnTimeout(NumberOfWorkItems);
 
}

In the ThreadPool class, we extract code from the destructor to a separate method with an intention-revealing name.

c9/9/ThreadPool.h
 
virtual​ ~ThreadPool() {
*
stop();
 
}
 
*
void​ stop() {
*
done_ = true;
*
if​ (workThread_)
*
workThread_->join();
*
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.170.63