lix-releng-staging/src/libutil/thread-pool.cc

105 lines
2.5 KiB
C++
Raw Normal View History

#include "thread-pool.hh"
#include "affinity.hh"
namespace nix {
ThreadPool::ThreadPool(size_t _maxThreads)
: maxThreads(_maxThreads)
{
restoreAffinity(); // FIXME
if (!maxThreads) {
maxThreads = std::thread::hardware_concurrency();
if (!maxThreads) maxThreads = 1;
}
debug(format("starting pool of %d threads") % maxThreads);
}
ThreadPool::~ThreadPool()
{
std::vector<std::thread> workers;
{
auto state(state_.lock());
state->quit = true;
std::swap(workers, state->workers);
}
debug(format("reaping %d worker threads") % workers.size());
work.notify_all();
for (auto & thr : workers)
thr.join();
}
void ThreadPool::enqueue(const work_t & t)
{
auto state(state_.lock());
if (state->quit)
throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
state->left.push(t);
if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
state->workers.emplace_back(&ThreadPool::workerEntry, this);
work.notify_one();
}
void ThreadPool::process()
{
while (true) {
auto state(state_.lock());
if (state->exception)
std::rethrow_exception(state->exception);
if (state->left.empty() && !state->pending) break;
state.wait(done);
}
}
void ThreadPool::workerEntry()
{
bool didWork = false;
while (true) {
work_t w;
{
auto state(state_.lock());
while (true) {
if (state->quit || state->exception) return;
if (didWork) {
assert(state->pending);
state->pending--;
didWork = false;
}
if (!state->left.empty()) break;
if (!state->pending)
done.notify_all();
state.wait(work);
}
w = state->left.front();
state->left.pop();
state->pending++;
}
try {
w();
} catch (std::exception & e) {
auto state(state_.lock());
if (state->exception) {
if (!dynamic_cast<Interrupted*>(&e) &&
!dynamic_cast<ThreadPoolShutDown*>(&e))
printError(format("error: %s") % e.what());
} else {
state->exception = std::current_exception();
work.notify_all();
done.notify_all();
}
}
didWork = true;
}
}
}