diff --git a/src/hydra-queue-runner/build-result.cc b/src/hydra-queue-runner/build-result.cc index de9903b9..e7a2fda3 100644 --- a/src/hydra-queue-runner/build-result.cc +++ b/src/hydra-queue-runner/build-result.cc @@ -6,7 +6,7 @@ using namespace nix; -BuildResult getBuildResult(const Derivation & drv) +BuildResult getBuildResult(std::shared_ptr store, const Derivation & drv) { BuildResult res; diff --git a/src/hydra-queue-runner/build-result.hh b/src/hydra-queue-runner/build-result.hh index f8a93b3a..bbe6fd7a 100644 --- a/src/hydra-queue-runner/build-result.hh +++ b/src/hydra-queue-runner/build-result.hh @@ -1,5 +1,7 @@ #pragma once +#include + #include "hash.hh" #include "derivations.hh" @@ -22,4 +24,4 @@ struct BuildResult std::list products; }; -BuildResult getBuildResult(const nix::Derivation & drv); +BuildResult getBuildResult(std::shared_ptr store, const nix::Derivation & drv); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 95a1c8ad..d2edf32c 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -1,6 +1,10 @@ +#include +#include #include -#include #include +#include +#include + #include #include "build-result.hh" @@ -12,6 +16,40 @@ using namespace nix; +std::mutex exitRequestMutex; +std::condition_variable exitRequest; +bool exitRequested(false); + +static std::atomic_int _int(0); + +void sigintHandler(int signo) +{ + _int = 1; +} + + +void signalThread() +{ + struct sigaction act; + act.sa_handler = sigintHandler; + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + if (sigaction(SIGINT, &act, 0)) + throw SysError("installing handler for SIGINT"); + + while (true) { + sleep(1000000); + if (_int) break; + } + + { + std::lock_guard lock(exitRequestMutex); + exitRequested = true; + } + exitRequest.notify_all(); +} + + typedef enum { bsSuccess = 0, bsFailed = 1, @@ -74,6 +112,9 @@ struct Step class State { private: + + std::thread queueMonitorThread; + /* The queued builds. */ std::map builds; @@ -84,6 +125,9 @@ private: /* Build steps that have no unbuilt dependencies. */ std::set runnable; + std::mutex runnableMutex; + std::condition_variable runnableCV; + public: State(); @@ -99,21 +143,27 @@ public: void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); - void getQueuedBuilds(pqxx::connection & conn); + void queueMonitorThreadEntry(); - Step::ptr createStep(const Path & drvPath); + void getQueuedBuilds(std::shared_ptr store, pqxx::connection & conn); + + Step::ptr createStep(std::shared_ptr store, const Path & drvPath); void destroyStep(Step::ptr step, bool proceed); /* Get the builds that depend on the given step. */ std::set getDependentBuilds(Step::ptr step); - void doBuildSteps(); + void makeRunnable(Step::ptr step); - void doBuildStep(Step::ptr step); + void builderThreadEntry(int slot); + + void doBuildStep(std::shared_ptr store, Step::ptr step); void markSucceededBuild(pqxx::work & txn, Build::ptr build, const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime); + + void run(); }; @@ -180,8 +230,30 @@ void State::finishBuildStep(pqxx::work & txn, time_t stopTime, BuildID buildId, } -void State::getQueuedBuilds(pqxx::connection & conn) +void State::queueMonitorThreadEntry() { + auto store = openStore(); // FIXME: pool + + Connection conn; + + while (true) { + getQueuedBuilds(store, conn); + + { + std::unique_lock lock(exitRequestMutex); + exitRequest.wait_for(lock, std::chrono::seconds(5)); + if (exitRequested) break; + } + } + + printMsg(lvlError, "queue monitor exits"); +} + + +void State::getQueuedBuilds(std::shared_ptr store, pqxx::connection & conn) +{ + printMsg(lvlError, "checking the queue..."); + pqxx::work txn(conn); // FIXME: query only builds with ID higher than the previous @@ -213,10 +285,10 @@ void State::getQueuedBuilds(pqxx::connection & conn) continue; } - Step::ptr step = createStep(build->drvPath); + Step::ptr step = createStep(store, build->drvPath); if (!step) { Derivation drv = readDerivation(build->drvPath); - BuildResult res = getBuildResult(drv); + BuildResult res = getBuildResult(store, drv); Connection conn; pqxx::work txn(conn); @@ -234,7 +306,7 @@ void State::getQueuedBuilds(pqxx::connection & conn) } -Step::ptr State::createStep(const Path & drvPath) +Step::ptr State::createStep(std::shared_ptr store, const Path & drvPath) { auto prev = steps.find(drvPath); if (prev != steps.end()) return prev->second; @@ -262,7 +334,7 @@ Step::ptr State::createStep(const Path & drvPath) /* Create steps for the dependencies. */ for (auto & i : step->drv.inputDrvs) { - Step::ptr dep = createStep(i.first); + Step::ptr dep = createStep(store, i.first); if (dep) { step->deps.insert(dep); dep->rdeps.push_back(step); @@ -271,7 +343,7 @@ Step::ptr State::createStep(const Path & drvPath) steps[drvPath] = step; - if (step->deps.empty()) runnable.insert(step); + if (step->deps.empty()) makeRunnable(step); return step; } @@ -290,7 +362,7 @@ void State::destroyStep(Step::ptr step, bool proceed) /* If this rdep has no other dependencies, then we can now build it. */ if (rdep->deps.empty()) - runnable.insert(rdep); + makeRunnable(rdep); } else /* If ‘step’ failed, then delete all dependent steps as well. */ @@ -334,18 +406,43 @@ std::set State::getDependentBuilds(Step::ptr step) } -void State::doBuildSteps() +void State::makeRunnable(Step::ptr step) { - while (!runnable.empty()) { - printMsg(lvlInfo, format("%1% runnable steps") % runnable.size()); - Step::ptr step = *runnable.begin(); - runnable.erase(step); - doBuildStep(step); + assert(step->deps.empty()); + + { + std::lock_guard lock(runnableMutex); + runnable.insert(step); } + + runnableCV.notify_one(); } -void State::doBuildStep(Step::ptr step) +void State::builderThreadEntry(int slot) +{ + auto store = openStore(); // FIXME: pool + + while (true) { + Step::ptr step; + { + std::unique_lock lock(runnableMutex); + while (runnable.empty()) + runnableCV.wait(lock); + step = *runnable.begin(); + runnable.erase(step); + } + + printMsg(lvlError, format("slot %1%: got build step ‘%2%’") % slot % step->drvPath); + + doBuildStep(store, step); + } + + printMsg(lvlError, "builder thread exits"); +} + + +void State::doBuildStep(std::shared_ptr store, Step::ptr step) { assert(step->deps.empty()); @@ -398,7 +495,7 @@ void State::doBuildStep(Step::ptr step) time_t stopTime = time(0); BuildResult res; - if (success) res = getBuildResult(step->drv); + if (success) res = getBuildResult(store, step->drv); // FIXME: handle failed-with-output @@ -484,27 +581,45 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build, } +void State::run() +{ + { + Connection conn; + markActiveBuildStepsAsAborted(conn, 0); + } + + queueMonitorThread = std::thread(&State::queueMonitorThreadEntry, this); + + sleep(1); + + for (int n = 0; n < 4; n++) + std::thread(&State::builderThreadEntry, this, n).detach(); + + queueMonitorThread.join(); +} + + int main(int argc, char * * argv) { return handleExceptions(argv[0], [&]() { initNix(); + std::thread(signalThread).detach(); + + /* Ignore signals. This is inherited by the other threads. */ + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGHUP); + sigaddset(&set, SIGINT); + sigaddset(&set, SIGTERM); + sigprocmask(SIG_BLOCK, &set, NULL); + settings.buildVerbosity = lvlVomit; settings.useSubstitutes = false; - store = openStore(); - /* FIXME: need some locking to prevent multiple instances of hydra-queue-runner. */ - - Connection conn; - State state; - - state.markActiveBuildStepsAsAborted(conn, 0); - - state.getQueuedBuilds(conn); - - state.doBuildSteps(); + state.run(); }); }