forked from lix-project/lix
Fix a hang in ThreadPool
The worker threads could exit prematurely if they finished processing all items while the main thread was still adding items. In particular, this caused hanging nix-store --serve processes in the build farm. Also, process items from the main thread.
This commit is contained in:
parent
838509d1a0
commit
fda7b95cb0
|
@ -13,10 +13,15 @@ ThreadPool::ThreadPool(size_t _maxThreads)
|
||||||
if (!maxThreads) maxThreads = 1;
|
if (!maxThreads) maxThreads = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug(format("starting pool of %d threads") % maxThreads);
|
debug("starting pool of %d threads", maxThreads - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadPool::~ThreadPool()
|
ThreadPool::~ThreadPool()
|
||||||
|
{
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadPool::shutdown()
|
||||||
{
|
{
|
||||||
std::vector<std::thread> workers;
|
std::vector<std::thread> workers;
|
||||||
{
|
{
|
||||||
|
@ -25,7 +30,9 @@ ThreadPool::~ThreadPool()
|
||||||
std::swap(workers, state->workers);
|
std::swap(workers, state->workers);
|
||||||
}
|
}
|
||||||
|
|
||||||
debug(format("reaping %d worker threads") % workers.size());
|
if (workers.empty()) return;
|
||||||
|
|
||||||
|
debug("reaping %d worker threads", workers.size());
|
||||||
|
|
||||||
work.notify_all();
|
work.notify_all();
|
||||||
|
|
||||||
|
@ -38,31 +45,42 @@ void ThreadPool::enqueue(const work_t & t)
|
||||||
auto state(state_.lock());
|
auto state(state_.lock());
|
||||||
if (quit)
|
if (quit)
|
||||||
throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
|
throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
|
||||||
state->left.push(t);
|
state->pending.push(t);
|
||||||
if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
|
/* Note: process() also executes items, so count it as a worker. */
|
||||||
state->workers.emplace_back(&ThreadPool::workerEntry, this);
|
if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads)
|
||||||
|
state->workers.emplace_back(&ThreadPool::doWork, this, false);
|
||||||
work.notify_one();
|
work.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadPool::process()
|
void ThreadPool::process()
|
||||||
{
|
{
|
||||||
/* Loop until there are no active work items *and* there either
|
state_.lock()->draining = true;
|
||||||
are no queued items or there is an exception. The
|
|
||||||
post-condition is that no new items will become active. */
|
/* Do work until no more work is pending or active. */
|
||||||
while (true) {
|
try {
|
||||||
|
doWork(true);
|
||||||
|
|
||||||
auto state(state_.lock());
|
auto state(state_.lock());
|
||||||
if (!state->active) {
|
|
||||||
|
assert(quit);
|
||||||
|
|
||||||
if (state->exception)
|
if (state->exception)
|
||||||
std::rethrow_exception(state->exception);
|
std::rethrow_exception(state->exception);
|
||||||
if (state->left.empty())
|
|
||||||
break;
|
} catch (...) {
|
||||||
}
|
/* In the exceptional case, some workers may still be
|
||||||
state.wait(done);
|
active. They may be referencing the stack frame of the
|
||||||
|
caller. So wait for them to finish. (~ThreadPool also does
|
||||||
|
this, but it might be destroyed after objects referenced by
|
||||||
|
the work item lambdas.) */
|
||||||
|
shutdown();
|
||||||
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadPool::workerEntry()
|
void ThreadPool::doWork(bool mainThread)
|
||||||
{
|
{
|
||||||
|
if (!mainThread)
|
||||||
interruptCheck = [&]() { return (bool) quit; };
|
interruptCheck = [&]() { return (bool) quit; };
|
||||||
|
|
||||||
bool didWork = false;
|
bool didWork = false;
|
||||||
|
@ -99,24 +117,27 @@ void ThreadPool::workerEntry()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Wait until a work item is available or another thread
|
/* Wait until a work item is available or we're asked to
|
||||||
had an exception or we're asked to quit. */
|
quit. */
|
||||||
while (true) {
|
while (true) {
|
||||||
if (quit) {
|
if (quit) return;
|
||||||
if (!state->active)
|
|
||||||
done.notify_one();
|
if (!state->pending.empty()) break;
|
||||||
return;
|
|
||||||
}
|
/* If there are no active or pending items, and the
|
||||||
if (!state->left.empty()) break;
|
main thread is running process(), then no new items
|
||||||
if (!state->active) {
|
can be added. So exit. */
|
||||||
done.notify_one();
|
if (!state->active && state->draining) {
|
||||||
|
quit = true;
|
||||||
|
work.notify_all();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
state.wait(work);
|
state.wait(work);
|
||||||
}
|
}
|
||||||
|
|
||||||
w = std::move(state->left.front());
|
w = std::move(state->pending.front());
|
||||||
state->left.pop();
|
state->pending.pop();
|
||||||
state->active++;
|
state->active++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,19 +44,22 @@ private:
|
||||||
|
|
||||||
struct State
|
struct State
|
||||||
{
|
{
|
||||||
std::queue<work_t> left;
|
std::queue<work_t> pending;
|
||||||
size_t active = 0;
|
size_t active = 0;
|
||||||
std::exception_ptr exception;
|
std::exception_ptr exception;
|
||||||
std::vector<std::thread> workers;
|
std::vector<std::thread> workers;
|
||||||
|
bool draining = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::atomic_bool quit{false};
|
std::atomic_bool quit{false};
|
||||||
|
|
||||||
Sync<State> state_;
|
Sync<State> state_;
|
||||||
|
|
||||||
std::condition_variable work, done;
|
std::condition_variable work;
|
||||||
|
|
||||||
void workerEntry();
|
void doWork(bool mainThread);
|
||||||
|
|
||||||
|
void shutdown();
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Process in parallel a set of items of type T that have a partial
|
/* Process in parallel a set of items of type T that have a partial
|
||||||
|
|
Loading…
Reference in a new issue