ThreadPool: Start doing work as soon as work items are enqueued

This commit is contained in:
Eelco Dolstra 2016-04-22 18:19:17 +02:00
parent 58c84cda3b
commit b2ce6fde5a
2 changed files with 84 additions and 59 deletions

View file

@ -1,79 +1,99 @@
#include "thread-pool.hh"
#include "affinity.hh"
namespace nix {
ThreadPool::ThreadPool(size_t _nrThreads)
: nrThreads(_nrThreads)
ThreadPool::ThreadPool(size_t _maxThreads)
: maxThreads(_maxThreads)
{
if (!nrThreads) {
nrThreads = std::thread::hardware_concurrency();
if (!nrThreads) nrThreads = 1;
restoreAffinity(); // FIXME
if (!maxThreads) {
maxThreads = std::thread::hardware_concurrency();
if (!maxThreads) maxThreads = 1;
}
debug(format("starting pool of %d threads") % maxThreads);
}
ThreadPool::~ThreadPool()
{
std::vector<std::thread> workers;
{
auto state(state_.lock());
state->quit = true;
std::swap(workers, state->workers);
}
debug(format("reaping %d worker threads") % workers.size());
work.notify_all();
for (auto & thr : workers)
thr.join();
}
void ThreadPool::enqueue(const work_t & t)
{
auto state_(state.lock());
state_->left.push(t);
wakeup.notify_one();
auto state(state_.lock());
assert(!state->quit);
state->left.push(t);
if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
state->workers.emplace_back(&ThreadPool::workerEntry, this);
work.notify_one();
}
void ThreadPool::process()
{
printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads);
while (true) {
auto state(state_.lock());
if (state->exception)
std::rethrow_exception(state->exception);
if (state->left.empty() && !state->pending) break;
state.wait(done);
}
}
std::vector<std::thread> workers;
for (size_t n = 0; n < nrThreads; n++)
workers.push_back(std::thread([&]() {
bool first = true;
void ThreadPool::workerEntry()
{
bool didWork = false;
while (true) {
work_t w;
{
auto state(state_.lock());
while (true) {
work_t work;
{
auto state_(state.lock());
if (state_->exception) return;
if (!first) {
assert(state_->pending);
state_->pending--;
}
first = false;
while (state_->left.empty()) {
if (!state_->pending) {
wakeup.notify_all();
return;
}
if (state_->exception) return;
state_.wait(wakeup);
}
work = state_->left.front();
state_->left.pop();
state_->pending++;
}
try {
work();
} catch (std::exception & e) {
auto state_(state.lock());
if (state_->exception) {
if (!dynamic_cast<Interrupted*>(&e))
printMsg(lvlError, format("error: %s") % e.what());
} else {
state_->exception = std::current_exception();
wakeup.notify_all();
}
if (state->quit || state->exception) return;
if (didWork) {
assert(state->pending);
state->pending--;
didWork = false;
}
if (!state->left.empty()) break;
if (!state->pending)
done.notify_all();
state.wait(work);
}
w = state->left.front();
state->left.pop();
state->pending++;
}
}));
try {
w();
} catch (std::exception & e) {
auto state(state_.lock());
if (state->exception) {
if (!dynamic_cast<Interrupted*>(&e))
printMsg(lvlError, format("error: %s") % e.what());
} else {
state->exception = std::current_exception();
work.notify_all();
done.notify_all();
}
}
for (auto & thr : workers)
thr.join();
{
auto state_(state.lock());
if (state_->exception)
std::rethrow_exception(state_->exception);
didWork = true;
}
}

View file

@ -15,7 +15,9 @@ class ThreadPool
{
public:
ThreadPool(size_t nrThreads = 0);
ThreadPool(size_t maxThreads = 0);
~ThreadPool();
// FIXME: use std::packaged_task?
typedef std::function<void()> work_t;
@ -34,19 +36,22 @@ public:
private:
size_t nrThreads;
size_t maxThreads;
struct State
{
std::queue<work_t> left;
size_t pending = 0;
std::exception_ptr exception;
std::vector<std::thread> workers;
bool quit = false;
};
Sync<State> state;
Sync<State> state_;
std::condition_variable wakeup;
std::condition_variable work, done;
void workerEntry();
};
}