We want to demonstrate that the worker thread can pull and execute multiple work items from the queue.
c9/7/ThreadPoolTest.cpp | |
| TEST(AThreadPool, ExecutesAllWork) { |
| pool.start(); |
| unsigned int count{0}; |
| unsigned int NumberOfWorkItems{3}; |
| condition_variable wasExecuted; |
| Work work{[&] { |
| std::unique_lock<std::mutex> lock(m); |
| ++count; |
| wasExecuted.notify_all(); |
| }}; |
| for (unsigned int i{0}; i < NumberOfWorkItems; i++) |
| pool.add(work); |
| unique_lock<mutex> lock(m); |
| CHECK_TRUE(wasExecuted.wait_for(lock, chrono::milliseconds(100), |
| [&] { return count == NumberOfWorkItems; })); |
| } |
Our implementation introduces a while loop and a boolean flag that tells the loop to stop when the ThreadPool instance destructs.
c9/7/ThreadPool.h | |
| #include <string> |
| #include <deque> |
| #include <thread> |
| #include <memory> |
* | #include <atomic> |
| #include "Work.h" |
| class ThreadPool { |
| public: |
| virtual ~ThreadPool() { |
* | done_ = true; |
| if (workThread_) |
| workThread_->join(); |
| } |
| // ... |
| private: |
| void worker() { |
* | while (!done_) { |
| while (!hasWork()) |
| ; |
| pullWork().execute(); |
* | } |
| } |
* | std::atomic<bool> done_{false}; |
| std::deque<Work> workQueue_; |
| std::shared_ptr<std::thread> workThread_; |
| }; |
Unfortunately...no, fortunately, our lame implementation hangs the test run every time. Consistent failure when dealing with threads is a great step toward a solution. A bit of analysis suggests that once the test completes, the ThreadPool destructor sets the done_ flag to true and then attempts to join the thread. The thread can’t complete because it’s stuck in the while loop that waits for work to be available.
We add a conditional to our wait-for-work loop and also break out of the loop if the done_ flag turns on.
c9/8/ThreadPool.h | |
| void worker() { |
| while (!done_) { |
* | while (!done_ && !hasWork()) |
* | ; |
* | if (done_) break; |
| pullWork().execute(); |
| } |
| } |
Our tests no longer hang and pass on their first run. Once again, however, they fail intermittently. We need to hit things harder from the test so that it fails every time. A quick attempt at bumping up the number of work items added to the loop doesn’t appear to make much difference. Our test needs to add work items from multiple threads created by the test itself.
Let’s refactor first. We clean up the two tests that add work to the pool.
c9/9/ThreadPoolTest.cpp | |
| TEST_GROUP(AThreadPool_AddRequest) { |
| mutex m; |
| ThreadPool pool; |
| condition_variable wasExecuted; |
| unsigned int count{0}; |
| void setup() override { |
| pool.start(); |
| } |
| |
| void incrementCountAndNotify() { |
| std::unique_lock<std::mutex> lock(m); |
| ++count; |
| wasExecuted.notify_all(); |
| } |
| |
| void waitForCountAndFailOnTimeout( |
| unsigned int expectedCount, |
| const milliseconds& time=milliseconds(100)) { |
| unique_lock<mutex> lock(m); |
| CHECK_TRUE(wasExecuted.wait_for(lock, time, |
| [&] { return expectedCount == count; })); |
| } |
| }; |
| |
| TEST(AThreadPool_AddRequest, PullsWorkInAThread) { |
| Work work{[&] { incrementCountAndNotify(); }}; |
| unsigned int NumberOfWorkItems{1}; |
| |
| pool.add(work); |
| waitForCountAndFailOnTimeout(NumberOfWorkItems); |
| } |
| |
| TEST(AThreadPool_AddRequest, ExecutesAllWork) { |
| Work work{[&] { incrementCountAndNotify(); }}; |
| unsigned int NumberOfWorkItems{3}; |
| |
| for (unsigned int i{0}; i < NumberOfWorkItems; i++) |
| pool.add(work); |
| |
| waitForCountAndFailOnTimeout(NumberOfWorkItems); |
| } |
In the ThreadPool class, we extract code from the destructor to a separate method with an intention-revealing name.
c9/9/ThreadPool.h | |
| virtual ~ThreadPool() { |
* | stop(); |
| } |
| |
* | void stop() { |
* | done_ = true; |
* | if (workThread_) |
* | workThread_->join(); |
* | } |
18.223.170.63