diff --git a/src/hydra-queue-runner/Makefile.am b/src/hydra-queue-runner/Makefile.am index 2525c936..00aa254d 100644 --- a/src/hydra-queue-runner/Makefile.am +++ b/src/hydra-queue-runner/Makefile.am @@ -1,6 +1,6 @@ bin_PROGRAMS = hydra-queue-runner -hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc +hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc build-remote.cc hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx AM_CXXFLAGS = $(NIX_CFLAGS) -Wall diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc new file mode 100644 index 00000000..cbce20f3 --- /dev/null +++ b/src/hydra-queue-runner/build-remote.cc @@ -0,0 +1,170 @@ +#include + +#include +#include +#include + +#include "build-remote.hh" + +#include "util.hh" +#include "misc.hh" +#include "serve-protocol.hh" +#include "worker-protocol.hh" + +using namespace nix; + + +struct Child +{ + Pid pid; + AutoCloseFD to, from; +}; + + +static void openConnection(const string & sshName, const string & sshKey, + int stderrFD, Child & child) +{ + Pipe to, from; + to.create(); + from.create(); + + child.pid = startProcess([&]() { + + if (dup2(to.readSide, STDIN_FILENO) == -1) + throw SysError("cannot dup input pipe to stdin"); + + if (dup2(from.writeSide, STDOUT_FILENO) == -1) + throw SysError("cannot dup output pipe to stdout"); + + if (dup2(stderrFD, STDERR_FILENO) == -1) + throw SysError("cannot dup stderr"); + + Strings argv({"ssh", "-x", "-a", sshName, "--", "nix-store", "--serve", "--write"}); + + execvp("ssh", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast + + throw SysError("cannot start ssh"); + }); + + to.readSide.close(); + from.writeSide.close(); + + child.to = to.writeSide.borrow(); + child.from = from.readSide.borrow(); +} + + +static void copyClosureTo(std::shared_ptr store, + FdSource & from, FdSink & to, const PathSet & paths, + bool useSubstitutes = false) +{ + PathSet closure; + for (auto & path : paths) + computeFSClosure(*store, path, closure); + + Paths sorted = topoSortPaths(*store, closure); + + /* Send the "query valid paths" command with the "lock" option + enabled. This prevents a race where the remote host + garbage-collect paths that are already there. Optionally, ask + the remote host to substitute missing paths. */ + writeInt(cmdQueryValidPaths, to); + writeInt(1, to); // == lock paths + writeInt(useSubstitutes, to); + writeStrings(sorted, to); + to.flush(); + + /* Get back the set of paths that are already valid on the remote + host. */ + auto present = readStorePaths(from); + + PathSet missing; + std::set_difference(closure.begin(), closure.end(), present.begin(), present.end(), + std::inserter(missing, missing.end())); + + printMsg(lvlError, format("sending %1% missing paths") % missing.size()); + if (missing.empty()) return; + + throw Error("NOT IMPL 1"); +} + + +static void copyClosureFrom(std::shared_ptr store, + FdSource & from, FdSink & to, const PathSet & paths) +{ + writeInt(cmdExportPaths, to); + writeInt(0, to); // == don't sign + writeStrings(paths, to); + to.flush(); + store->importPaths(false, from); +} + + +void buildRemote(std::shared_ptr store, + const string & sshName, const string & sshKey, + const Path & drvPath, const Derivation & drv, + const nix::Path & logDir, RemoteResult & result) +{ + string base = baseNameOf(drvPath); + Path logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2); + + createDirs(dirOf(logFile)); + + AutoCloseFD logFD(open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666)); + if (logFD == -1) throw SysError(format("creating log file ‘%1%’") % logFile); + + Child child; + openConnection(sshName, sshKey, logFD, child); + + logFD.close(); + + FdSource from(child.from); + FdSink to(child.to); + + /* Handshake. */ + writeInt(SERVE_MAGIC_1, to); + writeInt(SERVE_PROTOCOL_VERSION, to); + to.flush(); + + unsigned int magic = readInt(from); + if (magic != SERVE_MAGIC_2) + throw Error(format("protocol mismatch with ‘nix-store --serve’ on ‘%1%’") % sshName); + unsigned int version = readInt(from); + if (GET_PROTOCOL_MAJOR(version) != 0x200) + throw Error(format("unsupported ‘nix-store --serve’ protocol version on ‘%1%’") % sshName); + + /* Copy the input closure. */ + printMsg(lvlError, format("sending closure of ‘%1%’ to ‘%2%’") % drvPath % sshName); + copyClosureTo(store, from, to, PathSet({drvPath})); + + /* Do the build. */ + printMsg(lvlError, format("building ‘%1%’ on ‘%2%’") % drvPath % sshName); + writeInt(cmdBuildPaths, to); + writeStrings(PathSet({drvPath}), to); + writeInt(3600, to); // == maxSilentTime, FIXME + writeInt(7200, to); // == buildTimeout, FIXME + to.flush(); + result.startTime = time(0); + int res = readInt(from); + result.stopTime = time(0); + if (res) { + result.errorMsg = (format("%1% on ‘%2%’") % readString(from) % sshName).str(); + if (res == 100) result.status = RemoteResult::rrPermanentFailure; + else if (res == 101) result.status = RemoteResult::rrTimedOut; + else result.status = RemoteResult::rrMiscFailure; + return; + } + + /* Copy the output paths. */ + printMsg(lvlError, format("copying outputs of ‘%1%’ from ‘%2%’") % drvPath % sshName); + PathSet outputs; + for (auto & output : drv.outputs) + outputs.insert(output.second.path); + copyClosureFrom(store, from, to, outputs); + + /* Shut down the connection. */ + child.to.close(); + child.pid.wait(true); + + result.status = RemoteResult::rrSuccess; +} diff --git a/src/hydra-queue-runner/build-remote.hh b/src/hydra-queue-runner/build-remote.hh new file mode 100644 index 00000000..6406bc58 --- /dev/null +++ b/src/hydra-queue-runner/build-remote.hh @@ -0,0 +1,21 @@ +#pragma once + +#include "store-api.hh" +#include "derivations.hh" + +struct RemoteResult +{ + enum { + rrSuccess = 0, + rrPermanentFailure = 1, + rrTimedOut = 2, + rrMiscFailure = 3 + } status = rrMiscFailure; + std::string errorMsg; + time_t startTime = 0, stopTime = 0; +}; + +void buildRemote(std::shared_ptr store, + const std::string & sshName, const std::string & sshKey, + const nix::Path & drvPath, const nix::Derivation & drv, + const nix::Path & logDir, RemoteResult & result); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index ada15e67..5fe9be2b 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -8,6 +8,7 @@ #include #include "build-result.hh" +#include "build-remote.hh" #include "sync.hh" #include "pool.hh" @@ -134,6 +135,10 @@ struct Step Sync state; + std::atomic_bool destroyed; + + Step() : destroyed(false) { } + ~Step() { printMsg(lvlError, format("destroying step %1%") % drvPath); @@ -141,11 +146,49 @@ struct Step }; +struct Machine +{ + typedef std::shared_ptr ptr; + + std::string sshName, sshKey; + std::set systemTypes, supportedFeatures, mandatoryFeatures; + unsigned int maxJobs = 1; + float speedFactor = 1.0; + + Sync currentJobs; + + Machine() + { + auto currentJobs_(currentJobs.lock()); + *currentJobs_ = 0; + } +}; + + +/* A RAII helper that manages the currentJobs field of Machine + objects. */ +struct MachineReservation +{ + typedef std::shared_ptr ptr; + Machine::ptr machine; + MachineReservation(Machine::ptr machine) : machine(machine) + { + auto currentJobs_(machine->currentJobs.lock()); + (*currentJobs_)++; + } + ~MachineReservation() + { + auto currentJobs_(machine->currentJobs.lock()); + if (*currentJobs_ > 0) (*currentJobs_)--; + } +}; + + class State { private: - std::thread queueMonitorThread; + Path hydraData, logDir; /* CV for waking up the queue. */ std::condition_variable queueMonitorWakeup; @@ -168,20 +211,35 @@ private: std::condition_variable_any runnableWakeup; + /* CV for waking up the dispatcher. */ + std::condition_variable dispatcherWakeup; + std::mutex dispatcherMutex; + /* PostgreSQL connection pool. */ Pool dbPool; + /* The build machines. */ + typedef std::list Machines; + Sync machines; + + /* The currently active builder threads. FIXME: We could re-use + these, but since they're fairly long-running, it's probably not + worth it. */ + // std::vector builderThreads; + public: State(); ~State(); + void loadMachines(); + void markActiveBuildStepsAsAborted(time_t stopTime); int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, BuildStepStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0); - void finishBuildStep(pqxx::work & txn, time_t stopTime, BuildID buildId, int stepNr, + void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr, BuildStepStatus status, const string & errorMsg = "", BuildID propagatedFrom = 0); void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status); @@ -200,9 +258,17 @@ public: void makeRunnable(Step::ptr step); - void builderThreadEntry(int slot); + /* The thread that selects and starts runnable builds. */ + void dispatcher(); - void doBuildStep(std::shared_ptr store, Step::ptr step); + void wakeDispatcher(); + + MachineReservation::ptr findMachine(Step::ptr step); + + void builder(Step::ptr step, MachineReservation::ptr reservation); + + void doBuildStep(std::shared_ptr store, Step::ptr step, + Machine::ptr machine); void markSucceededBuild(pqxx::work & txn, Build::ptr build, const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime); @@ -213,6 +279,10 @@ public: State::State() { + hydraData = getEnv("HYDRA_DATA"); + if (hydraData == "") throw Error("$HYDRA_DATA must be set"); + + logDir = canonPath(hydraData + "/build-logs"); } @@ -227,6 +297,49 @@ State::~State() } +void State::loadMachines() +{ + Path machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines"); + + Machines newMachines; + + if (pathExists(machinesFile)) { + + for (auto line : tokenizeString(readFile(machinesFile), "\n")) { + line = trim(string(line, 0, line.find('#'))); + auto tokens = tokenizeString>(line); + if (tokens.size() < 3) continue; + tokens.resize(7); + + auto machine = std::make_shared(); + machine->sshName = tokens[0]; + machine->systemTypes = tokenizeString(tokens[1], ","); + machine->sshKey = tokens[2]; + if (tokens[3] != "") + string2Int(tokens[3], machine->maxJobs); + else + machine->maxJobs = 1; + machine->speedFactor = atof(tokens[4].c_str()); + machine->supportedFeatures = tokenizeString(tokens[5], ","); + machine->mandatoryFeatures = tokenizeString(tokens[6], ","); + newMachines.push_back(machine); + } + + } else { + auto machine = std::make_shared(); + machine->sshName = "localhost"; + machine->systemTypes = StringSet({settings.thisSystem}); + if (settings.thisSystem == "x86_64-linux") + machine->systemTypes.insert("i686-linux"); + machine->maxJobs = settings.maxBuildJobs; + newMachines.push_back(machine); + } + + auto machines_(machines.lock()); + *machines_ = newMachines; +} + + void State::markActiveBuildStepsAsAborted(time_t stopTime) { auto conn(dbPool.get()); @@ -262,15 +375,17 @@ int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, } -void State::finishBuildStep(pqxx::work & txn, time_t stopTime, BuildID buildId, int stepNr, +void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr, BuildStepStatus status, const std::string & errorMsg, BuildID propagatedFrom) { + assert(startTime); + assert(stopTime); txn.parameterized - ("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, stopTime = $6 where build = $2 and stepnr = $3") + ("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, startTime = $6, stopTime = $7 where build = $2 and stepnr = $3") ((int) status)(buildId)(stepNr) (propagatedFrom, propagatedFrom != 0) (errorMsg, errorMsg != "") - (stopTime, stopTime != 0).exec(); + (startTime)(stopTime).exec(); } @@ -346,6 +461,7 @@ void State::getQueuedBuilds(std::shared_ptr store) if (!store->isValidPath(build->drvPath)) { /* Derivation has been GC'ed prematurely. */ + printMsg(lvlInfo, format("aborting GC'ed build %1%") % build->id); pqxx::work txn(*conn); txn.parameterized ("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1") @@ -366,6 +482,8 @@ void State::getQueuedBuilds(std::shared_ptr store) Derivation drv = readDerivation(build->drvPath); BuildResult res = getBuildResult(store, drv); + printMsg(lvlInfo, format("cached build %1%") % build->id); + pqxx::work txn(*conn); time_t now = time(0); markSucceededBuild(txn, build, res, true, now, now); @@ -385,6 +503,9 @@ void State::getQueuedBuilds(std::shared_ptr store) build->toplevel = step; } + printMsg(lvlInfo, format("added build %1% (top-level step %2%, %3% new runnable steps)") + % build->id % step->drvPath % newRunnable.size()); + /* Prior to this, the build is not visible to getDependentBuilds(). Now it is, so the build can be failed if a dependency fails. (It can't succeed right away @@ -462,6 +583,9 @@ Step::ptr State::createStep(std::shared_ptr store, const Path & drvPat void State::destroyStep(Step::ptr step, bool proceed) { + if (step->destroyed) return; + step->destroyed = true; + printMsg(lvlInfo, format("destroying build step ‘%1%’") % step->drvPath); { @@ -547,6 +671,8 @@ std::set State::getDependentBuilds(Step::ptr step) void State::makeRunnable(Step::ptr step) { + printMsg(lvlInfo, format("step ‘%1%’ is now runnable") % step->drvPath); + { auto step_(step->state.lock()); assert(step_->deps.empty()); @@ -557,38 +683,109 @@ void State::makeRunnable(Step::ptr step) runnable_->push_back(step); } - runnableWakeup.notify_one(); + wakeDispatcher(); } -void State::builderThreadEntry(int slot) +void State::dispatcher() { - auto store = openStore(); // FIXME: pool + while (!exitRequested) { + printMsg(lvlError, "dispatcher woken up"); - while (true) { - /* Sleep until a runnable build step becomes available. */ - Step::ptr step; { auto runnable_(runnable.lock()); - while (runnable_->empty() && !exitRequested) - runnable_.wait(runnableWakeup); - if (exitRequested) break; - auto weak = *runnable_->begin(); - runnable_->pop_front(); - step = weak.lock(); - if (!step) continue; + printMsg(lvlError, format("%1% runnable builds") % runnable_->size()); + + /* FIXME: we're holding the runnable lock too long + here. This could be more efficient. */ + + for (auto i = runnable_->begin(); i != runnable_->end(); ) { + auto step = i->lock(); + + /* Delete dead steps. */ + if (!step) { + i = runnable_->erase(i); + continue; + } + + auto reservation = findMachine(step); + if (!reservation) { + printMsg(lvlError, format("cannot execute step ‘%1%’ right now") % step->drvPath); + ++i; + continue; + } + + printMsg(lvlInfo, format("WOOHOO: starting step ‘%1%’ on machine ‘%2%’") + % step->drvPath % reservation->machine->sshName); + i = runnable_->erase(i); + + auto builderThread = std::thread(&State::builder, this, step, reservation); + builderThread.detach(); // FIXME? + } } - /* Build it. */ - printMsg(lvlError, format("slot %1%: got build step ‘%2%’") % slot % step->drvPath); - doBuildStep(store, step); + /* Sleep until we're woken up (either because a runnable build + is added, or because a build finishes). */ + { + std::unique_lock lock(dispatcherMutex); + dispatcherWakeup.wait(lock); + } } - printMsg(lvlError, "builder thread exits"); + printMsg(lvlError, "dispatcher exits"); } -void State::doBuildStep(std::shared_ptr store, Step::ptr step) +void State::wakeDispatcher() +{ + { std::lock_guard lock(dispatcherMutex); } // barrier + dispatcherWakeup.notify_all(); +} + + +MachineReservation::ptr State::findMachine(Step::ptr step) +{ + auto machines_(machines.lock()); + + for (auto & machine : *machines_) { + if (!has(machine->systemTypes, step->drv.platform)) continue; + // FIXME: check features + { + auto currentJobs_(machine->currentJobs.lock()); + if (*currentJobs_ >= machine->maxJobs) continue; + } + return std::make_shared(machine); + } + + /* FIXME: distinguish between permanent failures (a matching + machine doesn't exist) and temporary failures (a matching + machine is not available). */ + + return 0; +} + + +void State::builder(Step::ptr step, MachineReservation::ptr reservation) +{ + try { + auto store = openStore(); // FIXME: pool + doBuildStep(store, step, reservation->machine); + } catch (std::exception & e) { + printMsg(lvlError, format("build thread for ‘%1%’: %2%") % step->drvPath % e.what()); + // FIXME: put step back in runnable and retry + } + + /* Release the machine and wake up the dispatcher. */ + assert(reservation.unique()); + reservation = 0; + wakeDispatcher(); + + printMsg(lvlError, "builder exits"); +} + + +void State::doBuildStep(std::shared_ptr store, Step::ptr step, + Machine::ptr machine) { /* There can be any number of builds in the database that depend on this derivation. Arbitrarily pick one (though preferring a @@ -625,27 +822,28 @@ void State::doBuildStep(std::shared_ptr store, Step::ptr step) /* Create a build step record indicating that we started building. */ auto conn(dbPool.get()); - time_t startTime = time(0); + RemoteResult result; + result.startTime = time(0); int stepNr; { pqxx::work txn(*conn); - stepNr = createBuildStep(txn, startTime, build, step, bssBusy); + stepNr = createBuildStep(txn, result.startTime, build, step, bssBusy); txn.commit(); } - bool success = false; - std::string errorMsg; try { - store->buildPaths(PathSet({step->drvPath})); - success = true; + buildRemote(store, machine->sshName, machine->sshKey, step->drvPath, step->drv, logDir, result); } catch (Error & e) { - errorMsg = e.msg(); + result.status = RemoteResult::rrMiscFailure; + result.errorMsg = e.msg(); + printMsg(lvlError, format("ERROR: %1%") % e.msg()); + abort(); } - time_t stopTime = time(0); + if (!result.stopTime) result.stopTime = time(0); BuildResult res; - if (success) res = getBuildResult(store, step->drv); + if (result.status == RemoteResult::rrSuccess) res = getBuildResult(store, step->drv); // FIXME: handle failed-with-output @@ -676,33 +874,34 @@ void State::doBuildStep(std::shared_ptr store, Step::ptr step) { pqxx::work txn(*conn); - if (success) { + if (result.status == RemoteResult::rrSuccess) { - finishBuildStep(txn, stopTime, build->id, stepNr, bssSuccess); + finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, bssSuccess); /* Mark all builds of which this derivation is the top level as succeeded. */ for (auto build2 : direct) - markSucceededBuild(txn, build2, res, false, startTime, stopTime); + markSucceededBuild(txn, build2, res, false, result.startTime, result.stopTime); } else { /* Create failed build steps for every build that depends on this. */ - finishBuildStep(txn, stopTime, build->id, stepNr, bssFailed, errorMsg); + finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, bssFailed, result.errorMsg); for (auto build2 : dependents) { if (build == build2) continue; - createBuildStep(txn, stopTime, build2, step, bssFailed, errorMsg, build->id); + createBuildStep(txn, result.stopTime, build2, step, bssFailed, result.errorMsg, build->id); } /* Mark all builds that depend on this derivation as failed. */ for (auto build2 : dependents) { + printMsg(lvlError, format("marking build %1% as failed") % build2->id); txn.parameterized ("update Builds set finished = 1, isCachedBuild = 0, buildStatus = $2, startTime = $3, stopTime = $4 where id = $1") (build2->id) ((int) (build2->drvPath == step->drvPath ? bsFailed : bsDepFailed)) - (startTime) - (stopTime).exec(); + (result.startTime) + (result.stopTime).exec(); build2->finishedInDB = true; // FIXME: txn might fail } } @@ -715,7 +914,7 @@ void State::doBuildStep(std::shared_ptr store, Step::ptr step) dependent Build objects. Any Steps not referenced by other Builds will be destroyed as well. */ for (auto build2 : dependents) - if (build2->toplevel == step || !success) { + if (build2->toplevel == step || result.status != RemoteResult::rrSuccess) { auto builds_(builds.lock()); builds_->erase(build2->id); } @@ -723,13 +922,15 @@ void State::doBuildStep(std::shared_ptr store, Step::ptr step) /* Remove the step from the graph. In case of success, make dependent build steps runnable if they have no other dependencies. */ - destroyStep(step, success); + destroyStep(step, result.status == RemoteResult::rrSuccess); } void State::markSucceededBuild(pqxx::work & txn, Build::ptr build, const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime) { + printMsg(lvlError, format("marking build %1% as succeeded") % build->id); + txn.parameterized ("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8 where id = $1") (build->id) @@ -765,11 +966,11 @@ void State::run() { markActiveBuildStepsAsAborted(0); - queueMonitorThread = std::thread(&State::queueMonitor, this); + loadMachines(); - std::vector builderThreads; - for (int n = 0; n < 4; n++) - builderThreads.push_back(std::thread(&State::builderThreadEntry, this, n)); + auto queueMonitorThread = std::thread(&State::queueMonitor, this); + + auto dispatcherThread = std::thread(&State::dispatcher, this); /* Wait for SIGINT. */ { @@ -785,9 +986,8 @@ void State::run() queueMonitorWakeup.notify_all(); queueMonitorThread.join(); - { auto runnable_(runnable.lock()); } // barrier - runnableWakeup.notify_all(); - for (auto & thread : builderThreads) thread.join(); + wakeDispatcher(); + dispatcherThread.join(); printMsg(lvlError, format("psql connections = %1%") % dbPool.count()); } diff --git a/src/lib/Hydra/Helper/Nix.pm b/src/lib/Hydra/Helper/Nix.pm index 16d498fb..087e66cb 100644 --- a/src/lib/Hydra/Helper/Nix.pm +++ b/src/lib/Hydra/Helper/Nix.pm @@ -133,8 +133,9 @@ sub getDrvLogPath { my $base = basename $drvPath; my $bucketed = substr($base, 0, 2) . "/" . substr($base, 2); my $fn = ($ENV{NIX_LOG_DIR} || "/nix/var/log/nix") . "/drvs/"; - for ($fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) { - return $_ if (-f $_); + my $fn2 = Hydra::Model::DB::getHydraPath . "/build-logs/"; + for ($fn2 . $bucketed, $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) { + return $_ if -f $_; } return undef; }