ThreadPool: On exception, interrupt the other worker threads

This commit is contained in:
Eelco Dolstra 2017-09-08 15:31:24 +02:00
parent 6a888ec29a
commit b7376edf06
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
4 changed files with 14 additions and 7 deletions

View file

@ -21,7 +21,7 @@ ThreadPool::~ThreadPool()
std::vector<std::thread> workers; std::vector<std::thread> workers;
{ {
auto state(state_.lock()); auto state(state_.lock());
state->quit = true; quit = true;
std::swap(workers, state->workers); std::swap(workers, state->workers);
} }
@ -36,7 +36,7 @@ ThreadPool::~ThreadPool()
void ThreadPool::enqueue(const work_t & t) void ThreadPool::enqueue(const work_t & t)
{ {
auto state(state_.lock()); auto state(state_.lock());
if (state->quit) if (quit)
throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down"); throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
state->left.push(t); state->left.push(t);
if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads) if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
@ -63,6 +63,8 @@ void ThreadPool::process()
void ThreadPool::workerEntry() void ThreadPool::workerEntry()
{ {
interruptCheck = [&]() { return (bool) quit; };
bool didWork = false; bool didWork = false;
std::exception_ptr exc; std::exception_ptr exc;
@ -80,7 +82,7 @@ void ThreadPool::workerEntry()
if (!state->exception) { if (!state->exception) {
state->exception = exc; state->exception = exc;
// Tell the other workers to quit. // Tell the other workers to quit.
state->quit = true; quit = true;
work.notify_all(); work.notify_all();
} else { } else {
/* Print the exception, since we can't /* Print the exception, since we can't
@ -100,7 +102,7 @@ void ThreadPool::workerEntry()
/* Wait until a work item is available or another thread /* Wait until a work item is available or another thread
had an exception or we're asked to quit. */ had an exception or we're asked to quit. */
while (true) { while (true) {
if (state->quit) { if (quit) {
if (!state->active) if (!state->active)
done.notify_one(); done.notify_one();
return; return;

View file

@ -7,6 +7,7 @@
#include <functional> #include <functional>
#include <thread> #include <thread>
#include <map> #include <map>
#include <atomic>
namespace nix { namespace nix {
@ -47,9 +48,10 @@ private:
size_t active = 0; size_t active = 0;
std::exception_ptr exception; std::exception_ptr exception;
std::vector<std::thread> workers; std::vector<std::thread> workers;
bool quit = false;
}; };
std::atomic_bool quit{false};
Sync<State> state_; Sync<State> state_;
std::condition_variable work, done; std::condition_variable work, done;

View file

@ -1002,6 +1002,7 @@ void closeOnExec(int fd)
bool _isInterrupted = false; bool _isInterrupted = false;
static thread_local bool interruptThrown = false; static thread_local bool interruptThrown = false;
thread_local std::function<bool()> interruptCheck;
void setInterruptThrown() void setInterruptThrown()
{ {
@ -1020,7 +1021,6 @@ void _interrupted()
} }
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////

View file

@ -273,13 +273,16 @@ void closeOnExec(int fd);
extern bool _isInterrupted; extern bool _isInterrupted;
extern thread_local std::function<bool()> interruptCheck;
void setInterruptThrown(); void setInterruptThrown();
void _interrupted(); void _interrupted();
void inline checkInterrupt() void inline checkInterrupt()
{ {
if (_isInterrupted) _interrupted(); if (_isInterrupted || (interruptCheck && interruptCheck()))
_interrupted();
} }
MakeError(Interrupted, BaseError) MakeError(Interrupted, BaseError)