On SIGINT, shut down the builder threads

Note that they don't get interrupted at the moment (so on SIGINT, any
running builds will need to finish first).
This commit is contained in:
Eelco Dolstra 2015-05-29 20:02:15 +02:00
parent e778821940
commit 214b95706c

View file

@ -27,7 +27,7 @@ bool has(const C & c, const V & v)
std::mutex exitRequestMutex; std::mutex exitRequestMutex;
std::condition_variable exitRequest; std::condition_variable exitRequest;
bool exitRequested(false); std::atomic<bool> exitRequested(false);
static std::atomic_int _int(0); static std::atomic_int _int(0);
@ -145,6 +145,8 @@ class State
private: private:
std::thread queueMonitorThread; std::thread queueMonitorThread;
std::mutex queueMonitorMutex;
std::condition_variable queueMonitorWakeup;
/* The queued builds. */ /* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds; typedef std::map<BuildID, Build::ptr> Builds;
@ -161,7 +163,7 @@ private:
typedef std::list<Step::wptr> Runnable; typedef std::list<Step::wptr> Runnable;
Sync<Runnable> runnable; Sync<Runnable> runnable;
std::condition_variable_any runnableCV; std::condition_variable_any runnableWakeup;
public: public:
State(); State();
@ -178,7 +180,7 @@ public:
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitorThreadEntry(); void queueMonitor();
void getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn); void getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn);
@ -266,19 +268,18 @@ void State::finishBuildStep(pqxx::work & txn, time_t stopTime, BuildID buildId,
} }
void State::queueMonitorThreadEntry() void State::queueMonitor()
{ {
auto store = openStore(); // FIXME: pool auto store = openStore(); // FIXME: pool
Connection conn; Connection conn;
while (true) { while (!exitRequested) {
getQueuedBuilds(store, conn); getQueuedBuilds(store, conn);
{ {
std::unique_lock<std::mutex> lock(exitRequestMutex); std::unique_lock<std::mutex> lock(queueMonitorMutex);
exitRequest.wait_for(lock, std::chrono::seconds(5)); queueMonitorWakeup.wait_for(lock, std::chrono::seconds(5));
if (exitRequested) break;
} }
} }
@ -550,7 +551,7 @@ void State::makeRunnable(Step::ptr step)
runnable_->push_back(step); runnable_->push_back(step);
} }
runnableCV.notify_one(); runnableWakeup.notify_one();
} }
@ -563,8 +564,9 @@ void State::builderThreadEntry(int slot)
Step::ptr step; Step::ptr step;
{ {
auto runnable_(runnable.lock()); auto runnable_(runnable.lock());
while (runnable_->empty()) while (runnable_->empty() && !exitRequested)
runnable_.wait(runnableCV); runnable_.wait(runnableWakeup);
if (exitRequested) break;
auto weak = *runnable_->begin(); auto weak = *runnable_->begin();
runnable_->pop_front(); runnable_->pop_front();
step = weak.lock(); step = weak.lock();
@ -760,12 +762,29 @@ void State::run()
markActiveBuildStepsAsAborted(conn, 0); markActiveBuildStepsAsAborted(conn, 0);
} }
queueMonitorThread = std::thread(&State::queueMonitorThreadEntry, this); queueMonitorThread = std::thread(&State::queueMonitor, this);
std::vector<std::thread> builderThreads;
for (int n = 0; n < 4; n++) for (int n = 0; n < 4; n++)
std::thread(&State::builderThreadEntry, this, n).detach(); builderThreads.push_back(std::thread(&State::builderThreadEntry, this, n));
/* Wait for SIGINT. */
{
std::unique_lock<std::mutex> lock(exitRequestMutex);
while (!exitRequested)
exitRequest.wait(lock);
}
printMsg(lvlError, "exiting...");
/* Shut down the various threads. */
{ std::lock_guard<std::mutex> lock(queueMonitorMutex); } // barrier
queueMonitorWakeup.notify_all();
queueMonitorThread.join(); queueMonitorThread.join();
{ auto runnable_(runnable.lock()); } // barrier
runnableWakeup.notify_all();
for (auto & thread : builderThreads) thread.join();
} }