forked from lix-project/lix
155 lines
4 KiB
C++
155 lines
4 KiB
C++
#include "thread-pool.hh"
|
|
#include "logging.hh"
|
|
#include "signals.hh"
|
|
|
|
namespace nix {
|
|
|
|
ThreadPool::ThreadPool(size_t _maxThreads)
|
|
: maxThreads(_maxThreads)
|
|
{
|
|
if (!maxThreads) {
|
|
maxThreads = std::thread::hardware_concurrency();
|
|
if (!maxThreads) maxThreads = 1;
|
|
}
|
|
|
|
debug("starting pool of %d threads", maxThreads - 1);
|
|
}
|
|
|
|
ThreadPool::~ThreadPool()
|
|
{
|
|
shutdown();
|
|
}
|
|
|
|
void ThreadPool::shutdown()
|
|
{
|
|
std::vector<std::thread> workers;
|
|
{
|
|
auto state(state_.lock());
|
|
quit = true;
|
|
std::swap(workers, state->workers);
|
|
}
|
|
|
|
if (workers.empty()) return;
|
|
|
|
debug("reaping %d worker threads", workers.size());
|
|
|
|
work.notify_all();
|
|
|
|
for (auto & thr : workers)
|
|
thr.join();
|
|
}
|
|
|
|
void ThreadPool::enqueue(const work_t & t)
|
|
{
|
|
auto state(state_.lock());
|
|
if (quit)
|
|
throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
|
|
state->pending.push(t);
|
|
/* Note: process() also executes items, so count it as a worker. */
|
|
if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads)
|
|
state->workers.emplace_back(&ThreadPool::doWork, this, false);
|
|
work.notify_one();
|
|
}
|
|
|
|
void ThreadPool::process()
|
|
{
|
|
state_.lock()->draining = true;
|
|
|
|
/* Do work until no more work is pending or active. */
|
|
try {
|
|
doWork(true);
|
|
|
|
auto state(state_.lock());
|
|
|
|
assert(quit);
|
|
|
|
if (state->exception)
|
|
std::rethrow_exception(state->exception);
|
|
|
|
} catch (...) {
|
|
/* In the exceptional case, some workers may still be
|
|
active. They may be referencing the stack frame of the
|
|
caller. So wait for them to finish. (~ThreadPool also does
|
|
this, but it might be destroyed after objects referenced by
|
|
the work item lambdas.) */
|
|
shutdown();
|
|
throw;
|
|
}
|
|
}
|
|
|
|
void ThreadPool::doWork(bool mainThread)
|
|
{
|
|
ReceiveInterrupts receiveInterrupts;
|
|
|
|
if (!mainThread)
|
|
interruptCheck = [&]() { return (bool) quit; };
|
|
|
|
bool didWork = false;
|
|
std::exception_ptr exc;
|
|
|
|
while (true) {
|
|
work_t w;
|
|
{
|
|
auto state(state_.lock());
|
|
|
|
if (didWork) {
|
|
assert(state->active);
|
|
state->active--;
|
|
|
|
if (exc) {
|
|
|
|
if (!state->exception) {
|
|
state->exception = exc;
|
|
// Tell the other workers to quit.
|
|
quit = true;
|
|
work.notify_all();
|
|
} else {
|
|
/* Print the exception, since we can't
|
|
propagate it. */
|
|
try {
|
|
std::rethrow_exception(exc);
|
|
} catch (std::exception & e) {
|
|
if (!dynamic_cast<Interrupted*>(&e) &&
|
|
!dynamic_cast<ThreadPoolShutDown*>(&e))
|
|
ignoreException();
|
|
} catch (...) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Wait until a work item is available or we're asked to
|
|
quit. */
|
|
while (true) {
|
|
if (quit) return;
|
|
|
|
if (!state->pending.empty()) break;
|
|
|
|
/* If there are no active or pending items, and the
|
|
main thread is running process(), then no new items
|
|
can be added. So exit. */
|
|
if (!state->active && state->draining) {
|
|
quit = true;
|
|
work.notify_all();
|
|
return;
|
|
}
|
|
|
|
state.wait(work);
|
|
}
|
|
|
|
w = std::move(state->pending.front());
|
|
state->pending.pop();
|
|
state->active++;
|
|
}
|
|
|
|
try {
|
|
w();
|
|
} catch (...) {
|
|
exc = std::current_exception();
|
|
}
|
|
|
|
didWork = true;
|
|
}
|
|
}
|
|
|
|
}
|