diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index ce126d36d..857ee91f8 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -13,10 +13,15 @@ ThreadPool::ThreadPool(size_t _maxThreads) if (!maxThreads) maxThreads = 1; } - debug(format("starting pool of %d threads") % maxThreads); + debug("starting pool of %d threads", maxThreads - 1); } ThreadPool::~ThreadPool() +{ + shutdown(); +} + +void ThreadPool::shutdown() { std::vector workers; { @@ -25,7 +30,9 @@ ThreadPool::~ThreadPool() std::swap(workers, state->workers); } - debug(format("reaping %d worker threads") % workers.size()); + if (workers.empty()) return; + + debug("reaping %d worker threads", workers.size()); work.notify_all(); @@ -38,32 +45,43 @@ void ThreadPool::enqueue(const work_t & t) auto state(state_.lock()); if (quit) throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down"); - state->left.push(t); - if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads) - state->workers.emplace_back(&ThreadPool::workerEntry, this); + state->pending.push(t); + /* Note: process() also executes items, so count it as a worker. */ + if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads) + state->workers.emplace_back(&ThreadPool::doWork, this, false); work.notify_one(); } void ThreadPool::process() { - /* Loop until there are no active work items *and* there either - are no queued items or there is an exception. The - post-condition is that no new items will become active. */ - while (true) { + state_.lock()->draining = true; + + /* Do work until no more work is pending or active. */ + try { + doWork(true); + auto state(state_.lock()); - if (!state->active) { - if (state->exception) - std::rethrow_exception(state->exception); - if (state->left.empty()) - break; - } - state.wait(done); + + assert(quit); + + if (state->exception) + std::rethrow_exception(state->exception); + + } catch (...) { + /* In the exceptional case, some workers may still be + active. They may be referencing the stack frame of the + caller. So wait for them to finish. (~ThreadPool also does + this, but it might be destroyed after objects referenced by + the work item lambdas.) */ + shutdown(); + throw; } } -void ThreadPool::workerEntry() +void ThreadPool::doWork(bool mainThread) { - interruptCheck = [&]() { return (bool) quit; }; + if (!mainThread) + interruptCheck = [&]() { return (bool) quit; }; bool didWork = false; std::exception_ptr exc; @@ -99,24 +117,27 @@ void ThreadPool::workerEntry() } } - /* Wait until a work item is available or another thread - had an exception or we're asked to quit. */ + /* Wait until a work item is available or we're asked to + quit. */ while (true) { - if (quit) { - if (!state->active) - done.notify_one(); - return; - } - if (!state->left.empty()) break; - if (!state->active) { - done.notify_one(); + if (quit) return; + + if (!state->pending.empty()) break; + + /* If there are no active or pending items, and the + main thread is running process(), then no new items + can be added. So exit. */ + if (!state->active && state->draining) { + quit = true; + work.notify_all(); return; } + state.wait(work); } - w = std::move(state->left.front()); - state->left.pop(); + w = std::move(state->pending.front()); + state->pending.pop(); state->active++; } diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index 06a097ab5..bb16b639a 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -44,19 +44,22 @@ private: struct State { - std::queue left; + std::queue pending; size_t active = 0; std::exception_ptr exception; std::vector workers; + bool draining = false; }; std::atomic_bool quit{false}; Sync state_; - std::condition_variable work, done; + std::condition_variable work; - void workerEntry(); + void doWork(bool mainThread); + + void shutdown(); }; /* Process in parallel a set of items of type T that have a partial