Creating Multiple Threads in the ThreadPool

Is that it? Can we ever know whether we’ve addressed all concurrency holes? To find the remaining problems, one analysis tactic is to think about any “gaps” we have—where we make an assumption about a fact that might no longer be true because of the actions of other obstreperous threads.

The worker function seems to remain the only code with any such potential. In worker, we loop until there is work; each time through the loop establishes and immediately releases a lock within hasWork. Once there is work, the loop exits, and control falls through to the statement pullWork().execute(). What if, during this short span, another thread has grabbed work?

Our ThreadPool currently manages only a single thread, meaning that the worker function pulls and executes work one by one, with no such chance for a concurrency problem. Let’s get our ThreadPool class to live up to its name and provide support for a pool of threads, not just one.

c9/12/ThreadPoolTest.cpp
 
TEST(AThreadPoolWithMultipleThreads, DispatchesWorkToMultipleThreads) {
 
unsigned​ ​int​ numberOfThreads{2};
 
pool.start(numberOfThreads);
 
Work work{[&] {
 
addThreadIfUnique(this_thread::get_id());
 
incrementCountAndNotify();
 
}};
 
unsigned​ ​int​ NumberOfWorkItems{500};
 
 
for​ (​unsigned​ ​int​ i{0}; i < NumberOfWorkItems; i++)
 
pool.add(work);
 
 
waitForCountAndFailOnTimeout(NumberOfWorkItems);
 
LONGS_EQUAL(numberOfThreads, numberOfThreadsProcessed());
 
}

The test DispatchesWorkToMultipleThreads demonstrates that client code can now start a specified number of threads. To verify that the ThreadPool indeed processes work in separate threads, we first update our work callback to add a thread if its ID is unique. Our assertion compares the thread count specified to the number of unique threads processed.

(Unfortunately, this test has the potential to fail on the rare occasion that one of the threads processes all of the work items. The exercise of eliminating this potential for sporadic failure is left to the reader.)

Changing ThreadPool to support spawning a specified number of threads requires little more than managing a vector of thread objects.

c9/12/ThreadPool.h
 
#include <string>
 
#include <deque>
 
#include <thread>
 
#include <memory>
 
#include <atomic>
 
#include <mutex>
*
#include <vector>
 
 
#include "Work.h"
 
 
class​ ThreadPool {
 
public​:
 
// ...
 
void​ stop() {
 
done_ = true;
*
for​ (​auto​& thread: threads_) thread.join();
 
}
 
*
void​ start(​unsigned​ ​int​ numberOfThreads=1) {
*
for​ (​unsigned​ ​int​ i{0u}; i < numberOfThreads; i++)
*
threads_.push_back(std::thread(&ThreadPool::worker, this));
*
}
 
// ...
 
private​:
 
// ...
 
std::atomic<​bool​> done_{false};
 
std::deque<Work> workQueue_;
 
std::shared_ptr<std::thread> workThread_;
 
std::mutex mutex_;
*
std::vector<std::thread> threads_;
 
};

Our test fails consistently. Given our suspicion around the worker function, we add a line of code to handle the case where work is no longer available (in other words, where another thread picked it up).

c9/12/ThreadPool.h
 
Work pullWork() {
 
std::lock_guard<std::mutex> block(mutex_);
 
*
if​ (workQueue_.empty()) ​return​ Work{};
 
 
auto​ work = workQueue_.back();
 
workQueue_.pop_back();
 
return​ work;
 
}

Tests pass again, so things are looking up. We know that our fix directly addressed the concurrency issue at hand.

The reactionary response to concurrency challenges is to throw synchronization at the problem. The end result can be much slower than it needs to be. In contrast, a test-driven approach helps ensure that you synchronize only elements that need it.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.128.204.5