diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 6b9d4aae..2eee9463 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -27,7 +27,7 @@ bool has(const C & c, const V & v) std::mutex exitRequestMutex; std::condition_variable exitRequest; -bool exitRequested(false); +std::atomic exitRequested(false); static std::atomic_int _int(0); @@ -145,6 +145,8 @@ class State private: std::thread queueMonitorThread; + std::mutex queueMonitorMutex; + std::condition_variable queueMonitorWakeup; /* The queued builds. */ typedef std::map Builds; @@ -161,7 +163,7 @@ private: typedef std::list Runnable; Sync runnable; - std::condition_variable_any runnableCV; + std::condition_variable_any runnableWakeup; public: State(); @@ -178,7 +180,7 @@ public: void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); - void queueMonitorThreadEntry(); + void queueMonitor(); void getQueuedBuilds(std::shared_ptr store, pqxx::connection & conn); @@ -266,19 +268,18 @@ void State::finishBuildStep(pqxx::work & txn, time_t stopTime, BuildID buildId, } -void State::queueMonitorThreadEntry() +void State::queueMonitor() { auto store = openStore(); // FIXME: pool Connection conn; - while (true) { + while (!exitRequested) { getQueuedBuilds(store, conn); { - std::unique_lock lock(exitRequestMutex); - exitRequest.wait_for(lock, std::chrono::seconds(5)); - if (exitRequested) break; + std::unique_lock lock(queueMonitorMutex); + queueMonitorWakeup.wait_for(lock, std::chrono::seconds(5)); } } @@ -550,7 +551,7 @@ void State::makeRunnable(Step::ptr step) runnable_->push_back(step); } - runnableCV.notify_one(); + runnableWakeup.notify_one(); } @@ -563,8 +564,9 @@ void State::builderThreadEntry(int slot) Step::ptr step; { auto runnable_(runnable.lock()); - while (runnable_->empty()) - runnable_.wait(runnableCV); + while (runnable_->empty() && !exitRequested) + runnable_.wait(runnableWakeup); + if (exitRequested) break; auto weak = *runnable_->begin(); runnable_->pop_front(); step = weak.lock(); @@ -760,12 +762,29 @@ void State::run() markActiveBuildStepsAsAborted(conn, 0); } - queueMonitorThread = std::thread(&State::queueMonitorThreadEntry, this); + queueMonitorThread = std::thread(&State::queueMonitor, this); + std::vector builderThreads; for (int n = 0; n < 4; n++) - std::thread(&State::builderThreadEntry, this, n).detach(); + builderThreads.push_back(std::thread(&State::builderThreadEntry, this, n)); + /* Wait for SIGINT. */ + { + std::unique_lock lock(exitRequestMutex); + while (!exitRequested) + exitRequest.wait(lock); + } + + printMsg(lvlError, "exiting..."); + + /* Shut down the various threads. */ + { std::lock_guard lock(queueMonitorMutex); } // barrier + queueMonitorWakeup.notify_all(); queueMonitorThread.join(); + + { auto runnable_(runnable.lock()); } // barrier + runnableWakeup.notify_all(); + for (auto & thread : builderThreads) thread.join(); }