Back to the GeoServer

Now that we’ve designed and implemented a ThreadPool, let’s take advantage of it. The first step is to change usersInBox to take a listener, or callback, as an argument. We update its code to return User objects to the client via the callback so that they can be asynchronously gathered.

The listener implementation in our test simply tracks users passed to its updated callback.

c9/13/GeoServerTest.cpp
 
TEST(AGeoServer_UsersInBox, AnswersUsersInSpecifiedRange) {
*
class​ GeoServerUserTrackingListener: ​public​ GeoServerListener {
*
public​:
*
void​ updated(​const​ User& user) { Users.push_back(user); }
*
vector<User> Users;
*
} trackingListener;
 
 
server.updateLocation(
 
bUser, Location{aUserLocation.go(Width / 2 - TenMeters, East)});
 
*
server.usersInBox(aUser, Width, Height, &trackingListener);
 
*
CHECK_EQUAL(vector<​string​> { bUser }, UserNames(trackingListener.Users));
 
}
c9/13/GeoServer.h
 
class​ GeoServerListener {
 
public​:
 
virtual​ ​void​ updated(​const​ User& user)=0;
 
};
 
 
class​ GeoServer {
 
public​:
 
// ...
 
std::vector<User> usersInBox(
 
const​ std::​string​& user, ​double​ widthInMeters, ​double​ heightInMeters,
 
GeoServerListener* listener=nullptr) ​const​;
 
// ...
 
};
c9/13/GeoServer.cpp
 
vector<User> GeoServer::usersInBox(
 
const​ ​string​& user, ​double​ widthInMeters, ​double​ heightInMeters,
 
GeoServerListener* listener) ​const​ {
 
auto​ location = locations_.find(user)->second;
 
Area box { location, widthInMeters, heightInMeters };
 
 
vector<User> users;
 
for​ (​auto​& each: locations_)
 
if​ (isDifferentUserInBounds(each, user, box)) {
 
users.push_back(User{each.first, each.second});
*
if​ (listener)
*
listener->updated(User{each.first, each.second});
 
}
 
return​ users;
 
}

As always, we seek incremental change, leaving in place the logic that directly returns a vector of users. This allows us to prove our idea before wasting a lot of time applying a similar implementation to other tests.

We update AnswersOnlyUsersWithinSpecifiedRange, as well as the slow, ignored test HandlesLargeNumbersOfUsers. We factor the common declaration of the GeoServerUserTrackingListener class into the test group. We remove any code that supports the old interest of returning the vector of users directly. Finally, we change usersInBox to assume the existence of a valid GeoServerListener pointer. Refer to code/c9/14 in the source distribution for the cleaned-up code.

The GeoServer tests AnswersUsersInSpecifiedRange and AnswersOnlyUsersWithinSpecifiedRange must still work. But if we use a ThreadPool, we’ll need to introduce waits in our tests, like the ones we coded in ThreadPoolTest. Instead, we choose to introduce a test double that reduces the ThreadPool to a single-threaded implementation of the add function.

c9/15/GeoServerTest.cpp
 
TEST_GROUP(AGeoServer_UsersInBox) {
 
GeoServer server;
 
// ...
 
class​ SingleThreadedPool: ​public​ ThreadPool {
 
public​:
 
virtual​ ​void​ add(Work work) override { work.execute(); }
 
};
 
shared_ptr<ThreadPool> pool;
 
void​ setup() override {
 
pool = make_shared<SingleThreadedPool>();
 
server.useThreadPool(pool);
 
// ...
 
}
 
// ...
 
};
 
TEST(AGeoServer_UsersInBox, AnswersUsersInSpecifiedRange) {
 
pool->start(0);
 
server.updateLocation(
 
bUser, Location{aUserLocation.go(Width / 2 - TenMeters, East)});
 
server.usersInBox(aUser, Width, Height, &trackingListener);
 
CHECK_EQUAL(vector<​string​> { bUser }, UserNames(trackingListener.Users));
 
}

We make ThreadPool’s add function virtual in order to allow the override.

Our test explicitly shows the code to start the pool, since it portrays a design choice—that it’s the client’s responsibility to start the pool. (This important piece of protocol is best described in a discrete test that you get to write.)

c9/15/GeoServer.h
 
class​ GeoServer {
 
public​:
 
// ...
 
void​ useThreadPool(std::shared_ptr<ThreadPool> pool);
 
// ...
 
};
c9/15/GeoServer.cpp
 
void​ GeoServer::usersInBox(
 
const​ ​string​& user, ​double​ widthInMeters, ​double​ heightInMeters,
 
GeoServerListener* listener) ​const​ {
 
auto​ location = locations_.find(user)->second;
 
Area box { location, widthInMeters, heightInMeters };
 
for​ (​auto​& each: locations_) {
 
Work work{[&] {
 
if​ (isDifferentUserInBounds(each, user, box))
 
listener->updated(User{each.first, each.second});
 
}};
*
pool_->add(work);
 
}
 
}
 
*
void​ GeoServer::useThreadPool(std::shared_ptr<ThreadPool> pool) {
*
pool_ = pool;
*
}

Do we need to write a test that interacts with a multithreaded pool? For purposes of test-driving or plain ol’ unit testing, no! We’ve demonstrated that a ThreadPool can take on work and dispatch it to different threads. We’ve demonstrated that the GeoServer logic to determine the users within a rectangle works correctly. And we’ve demonstrated that the GeoServer logic sends the work to the ThreadPool.

Any further test would be of another sort, and thus we write it only if we need it. Since our interest in using threading was to determine whether we could get immediate response from usersInBox and have locations returned asynchronously, we do want a test.

We add a new test, similar to HandlesLargeNumbersOfUsers, but one that kicks off usersInBox in a separate thread and uses the main thread to wait for all callbacks. We’ll want this test in our slow suite.

c9/17/GeoServerTest.cpp
 
TEST_GROUP_BASE(AGeoServer_ScaleTests, GeoServerUsersInBoxTests) {
 
class​ GeoServerCountingListener: ​public​ GeoServerListener {
 
public​:
 
void​ updated(​const​ User& user) override {
 
unique_lock<std::mutex> lock(mutex_);
 
Count++;
 
wasExecuted_.notify_all();
 
}
 
 
void​ waitForCountAndFailOnTimeout(​unsigned​ ​int​ expectedCount,
 
const​ milliseconds& time=milliseconds(10000)) {
 
unique_lock<mutex> lock(mutex_);
 
CHECK_TRUE(wasExecuted_.wait_for(lock, time, [&]
 
{ ​return​ expectedCount == Count; }));
 
}
 
condition_variable wasExecuted_;
 
unsigned​ ​int​ Count{0};
 
mutex mutex_;
 
};
 
GeoServerCountingListener countingListener;
 
shared_ptr<thread> t;
 
 
void​ setup() override {
 
pool = make_shared<ThreadPool>();
 
GeoServerUsersInBoxTests::setup();
 
}
 
 
void​ teardown() override {
 
t->join();
 
}
 
};
 
 
TEST(AGeoServer_ScaleTests, HandlesLargeNumbersOfUsers) {
 
pool->start(4);
 
const​ ​unsigned​ ​int​ lots{5000};
 
addUsersAt(lots, Location{aUserLocation.go(TenMeters, West)});
 
 
t = make_shared<thread>(
 
[&] { server.usersInBox(aUser, Width, Height, &countingListener); });
 
 
countingListener.waitForCountAndFailOnTimeout(lots);
 
}

(Given that there’s a lot of common setup between the prior tests for usersInBox, the test code you see here is representative of a heavily refactored solution. There’s also considerable duplication between the wait/notify concepts implemented in GeoServerCountingListener and those used in ThreadPoolTest code. We’d want to refactor into a construct usable by any thread-oriented test.)

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.166.31